Jump To …

runner.rb

The main application loop.

class Rocket
  
  attr_accessor :redis, :redis_message_queue_connection, :server_id
  

Show errors along with a backtrace in the event of an error happening in a controller action.

  def show_error(ex)
    puts "ERROR: #{$!}"
    puts "#{ex.message}\n#{ex.class}"
    puts "#{ex.backtrace.join "\n"}"
  end
  

This is a really simple processing queue that is pretty much a miniature implementation of pub/sub. It is primarily used if a user is unavailable when the server tries to send a message to them. This is all automatic, so you shouldn’t have to worry about it.

The whole thing is built on Redis’s blpop operation, which is a blocking pop. Normally blpop would watch a given key and block until a value is put into the list. Once something goes into the list, it pops the value from the list and returns it. However, since Rocket is completely event driven, it only blocks this particular Redis connection. It may sound strange, but if you spend a few minutes looking at it, you’ll realize it’s just leveraging events to recreate pub/sub in 20 lines of Ruby.

Yes, I’m aware Redis supports pub/sub already, but I’ve had people ask if it would be possible to use something other than Redis for this purpose. While that’s not the case yet, I’d rather not tie myself down to Redis’s pub/sub just in case.

  def process_message_queue

Wrapped in a fiber to support Synchrony.

    Fiber.new {

Wrapped in a begin/rescue block to keep going if there’s an error in a controller action.

      begin
        message = @redis_message_queue_connection.blpop("message_queue_#{@server_id}", 0)
        message = JSON.parse(message[1])
        user_id = message.delete "rocket_user_id"
        user = RocketUser.find(user_id)

Transmit to the user if possible, if not then put the message back in the queue.

        if user && user.web_socket
          transmit(user, message)
          process_message_queue
        else
          message.merge! :rocket_user_id => user_id
          ROCKET.redis.rpush("message_queue_#{ROCKET.server_id}", message.to_json)
          process_message_queue
        end
      rescue => ex
        show_error ex
      end
    }.resume
  end
  

Send the message to the client via a web socket connection.

  def transmit(user, message)
    if user
      user.web_socket.send message.to_json
    end
  end
  

Pass the args along to the controller action.

  def process_command(user, controller, command, args)
    @@controllers[controller].new.process_command(user, command, args)
  end
  

Set up Async Sinatra to serve static files.

  class StaticController < Sinatra::Base
    register Sinatra::Async
    set :root, APP_ROOT
    set :static, true
    set :public, File.join(APP_ROOT, "public")
  end
  

Parse the API commands. Takes {“Song.show” => {id: 1}} and calls the show action on the song controller, passing along the hash as an argument.

  def parse_command(user, msg)
    if msg.class == Array
      msg.each do |m|
        m.each_pair do |k, v|
          controller, mthd = *k.split(".")
          process_command user, controller.to_sym, mthd, v
        end
      end
    elsif msg.class == Hash
      msg.each_pair do |k, v|
        controller, mthd = *k.split(".")
        process_command user, controller.to_sym, mthd, v
      end
    end
  end
  

The loop where everything happens.

  def run
    EventMachine.synchrony do
      

Call any initializers. It seems a little funny to hold off on this until we get this far in, but initializing a DB connection outside the event loop will throw an error.

      Dir[File.join APP_ROOT, "config", "initializers", "*.rb"].each do |file|
        require file
      end
      

This pulls in all of the models defined in app/models. Currently it only pulls files in the model directory, but we should probably make it nested at some point. People might want that for namespacing.

      Dir[File.join APP_ROOT, "app", "models", "*.rb"].each do |file|
        require file
      end
      

Create two Redis connections; one for dealing with clients, and another for handling the processing queue.

      @redis = EM::Protocols::Redis.connect
      @redis_message_queue_connection = EM::Protocols::Redis.connect
      

The server gets a unique ID from Redis so other servers can identify it. This becomes useful when trying to communicate with a client connected to another server.

      @server_id = @redis.incr "server_ids"
      

Start the message processing queue.

      process_message_queue
      

Start the web socket listener.

      EventMachine::WebSocket.start(host: "0.0.0.0", port: 9185) do |ws|
        

Create a new client. Perhaps we should provide a callback here.

        ws.onopen do
          Fiber.new {RocketUser.create(ws)}.resume
        end
        

We don’t do anything here yet, but we should probably provide a callback and clean the disconnected user out of the database.

        ws.onclose do
          
        end
        

Deals with incoming messages. We should probably provide a callback.

        ws.onmessage do |json_msg|

Wrapped in a fiber for Synchrony.

          Fiber.new {

In a begin block to keep going if there’s an error in a controller action.

            begin

Grab the user and deal with their input.

              user = get_local_user(ws.signature)
              if user
                msg = JSON.parse(json_msg)
                parse_command(user, msg)
              end
            rescue => ex
              show_error ex
            end
          }.resume
        end
      end
      

Run Async Sinatra.

      Rack::Handler::Thin.run StaticController, Port: 9346
    end
  end
end