require 'socket'
require 'thread'

=begin
DRuby (not to be confused with dRuby, written by Masatoshi SEKI)
(C) Copyright 2001 Paul Brannan (cout at rm-f.net)

DRuby is a set of classes for providing distributed object support to a
Ruby program.  You may distribute and/or modify it under the same terms as
Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).  Example:

Client 
------
client = DRuby::Client.new('localhost', 4242)
obj = client.resolve("foo")
puts obj.foo("foo")
obj.oneway(:foo, "foo")

Server
------
class Foo
    def foo(x); return x; end
end
obj = Foo.new
server = DRuby::Server.new('localhost', 4242)
server.bind(obj, "foo")
server.thread.join

You can do all sorts of cool things with DRuby, including passing blocks to
the functions, throwing exceptions and propogating them from server to
client, and more.  Unlike CORBA, where you must create an interface
definition and strictly adhere to it, DRuby uses marshalling, so you can
use almost any object with it.  But, alas, it is not as powerful as CORBA.

On a fast machine, you should expect around 4000 messages per second with
normal method calls, and up to 10000 messages per second with oneway calls.
My dual-processor machine gets 3500/6500 messages per second, though, so
YMMV.

The DRuby message format is broken into 3 components:
    [ msg_type, obj_id, message ]
For each msg_type, there is a specific format to the message.  Additionally,
certain msg_types are only valid when being sent to the server, and others
are valid only when being sent back to the client.  Here is a summary:

msg_type        send to     meaning of obj_id       msg format
----------------------------------------------------------------------------
REQUEST         server      obj to talk to          [:method, *args]
REQUEST_BLOCK   server      obj to talk to          [:method, *args]
ONEWAY          server      obj to talk to          [:method, *args]
RETVAL          client      always 0                retval
EXCEPTION       client      always 0                $!
YIELD           client      always 0                [value, value, ...]
RESOLVE         server      always 0                object_name
SYNC            either      0=request, 1=response   nil

Known bugs:
1) Oneway calls are really slow if mixed with regular calls.  I'm not sure
why.  A workaround is to use a separate connection for each type of call,
but sometimes that doesn't work either.
2) The server does not refuse connections for unauthorized addresses; it will
instead accept a connection and then immediately close it.
3) If the server is flooded with oneway calls and drops packets, it will
listen for data until it encounters a valid message.  If this happens, the
client will not be notified.
=end

module DRuby

public

    # The DRuby server class.  Like its drb equivalent, this class spawns off
    # a new thread which processes requests, allowing the server to do other
    # things while it is doing processing for a distributed object.  This
    # means, though, that all objects used with DRuby must be thread-safe.
    class Server

    public
        attr_reader :host, :port, :obj, :thread

        # Start a server on the given host and port.  An acceptor may be
        # specified to accept/reject connections; it should be a proc that
        # takes a Socket as an argument and returns true or false.
        def initialize(host, port, acceptor=nil)
            @host = host
            @port = port
            @name_to_id = Hash.new
            @id_to_object = Array.new
            @id_to_acceptor = Array.new
            @mutex = Mutex.new
            @next_id = 1
            @unused_ids = Array.new
            @thread = Thread.new do
                server = TCPServer.new(@host, @port)
                while(socket = server.accept)
                    if acceptor then
                        if !acceptor.call(socket) then
                            socket.close
                            next
                        end
                    end
                    socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
                    Thread.new(socket) do |socket|
                        session_loop(Session.new(socket))
                    end
                end
            end
        end

        # Register an object with the server.  The object will be given an
        # id of @next_id, and @next_id will be incremented.  We could use the
        # object's real id, but this is insecure.  The acceptor argument is
        # a proc that takes a Socket as an arugment, and returns true or false
        # depending on whether the request should be allowed.  The supplied
        # object must be thread-safe.
        def create_reference(obj, acceptor=nil)
            @mutex.synchronize do
                ref = register_private(obj, acceptor)
                Object_Reference.new(ref) #return
            end
        end

        # Finds an object in linear time and unregister it.  If you need a
        # faster function, write one and send me a patch.
        def delete_reference(obj)
            @mutex.synchronize do
                unregister_private(obj)
            end
            nil #return
        end

        # Register an object with the server and bind it to name
        def bind(obj, name, acceptor=nil)
            id = register_private(obj, acceptor)
            @name_to_id[name] = id
            nil #return
        end

        # Main server loop.  Wait for a REQUEST message, process it, and send
        # back a YIELD, EXCEPTION, or RETVAL message.  Note that we don't do
        # any thread synchronization here, because all registered objects are
        # required to be thread-safe, and the @id_to_object lookup is atomic
        # (and if it succeeds, the object is guaranteed to be fully
        # registered).
        def session_loop(session)
            while not session.finished()
                type, object_id, message = session.get_message()
                begin
                    obj = @id_to_object[object_id]
                    if (acceptor = @id_to_acceptor[object_id]) then
                        if !acceptor.call(session.socket) then
                            if type != Session::ONEWAY then
                                raise "No authorization"
                            end
                        end
                    end
                    case type # In order of most to least common
                        when Session::REQUEST
                            retval = obj.__send__(*message)
                        when Session::ONEWAY
                            begin; obj.__send__(*message); rescue Exception; end
                            next
                        when Session::REQUEST_BLOCK
                            retval = obj.__send__(*message) do |*i|
                                session.send_message(Session::YIELD, 0, i)
                            end
                        when Session::RESOLVE
                            retval = @name_to_id[message]
                        when Session::SYNC
                            session.reply_sync(object_id)
                            next
                        else
                            raise "Bad session request"
                    end
                    session.send_message(Session::RETVAL, 0, retval)
                rescue Exception
                    session.send_message(Session::EXCEPTION, 0, $!)
                end
            end
        end

    private
        def register_private(obj, acceptor)
            if @next_id >= Session::MAX_ID then
                if @unused_ids.size == 0 then
                    raise "Object limit exceeded"
                else
                    id = @unused_ids.pop
                end
            end
            @id_to_acceptor[@next_id] = acceptor
            @id_to_object[@next_id] = obj
            old_id = @next_id
            @next_id = @next_id.succ()
            old_id #return
        end

        def unregister_private(obj)
            delete_obj_from_array_private(@id_to_acceptor, @next_id)
            delete_obj_from_array_private(@id_to_object, @next_id)
        end

        def delete_obj_from_array_private(array, obj)
            index = array.index(obj)
            array[index] = nil unless index == nil
        end
    end

    # The DRuby client class.  A DRuby server must be started on the given
    # host and port before instantiating a DRuby client.
    class Client
        attr_reader :host, :port
        
        # Connect a client to the server on the given host and port.  The
        # user can specify sync=false to turn off synchronization and get a
        # 20% speed boost.
        def initialize(host, port, sync=true)
            @host = host
            @port = port
            @server = TCPSocket.open(@host, @port)
            @server.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
            @session = Session.new(@server)
            @mutex = sync ? Mutex.new : Null_Mutex.new
        end

        # Given a string, return a proxy object that will forward requests
        # for an object on the server with that name.
        def resolve(object_name)
            @mutex.synchronize do
                @session.send_message(Session::RESOLVE, 0, object_name)
                type, obj, message = @session.get_message()
                case type
                    when Session::RETVAL
                        Proxy_Object.new(@session, @mutex, message) #return
                    when Session::EXCEPTION     ; raise message
                    else                        ; raise RuntimeError
                end
            end
        end
    end

private

    # In case the user does not want synchronization.
    class Null_Mutex
        def synchronize
            yield
        end
    end

    # All the special functions we have to keep track of
    class Functions
        GOOD = [
            :inspect, :class_variables, :instance_eval, :instance_variables,
            :to_a, :to_s
        ]

        BAD = [
            :clone, :dup, :display
        ]

        METHOD = [
            :methods, :private_methods, :protected_methods, :public_methods,
            :singleton_methods
        ]

        RESPOND = [
            [ :method,      "raise NameError" ],
            [ :respond_to?, "false" ]
        ]
    end

    # A DRuby::Object_Reference is created on the server side to represent an
    # object in the system.  It can be returned from a server object to a
    # client object, at which point it is converted into a DRuby::Proxy_Object.
    class Object_Reference
        attr_reader :object_id

        def initialize(object_id)
            @object_id = object_id
        end
    end

    # A DRuby::Object acts as a proxy; it forwards most methods to the server
    # for execution.
    class Proxy_Object

    public

        def initialize(session, mutex, object_id)
            @object_id = object_id
            @session = session
            @mutex = mutex
        end

        # Client request handler.  The idea here is to send out a REQUEST
        # message, and get back a YIELD, EXCEPTION, or RETVAL message.
        def method_missing(method, *args)
            message = [ method, *args ]
            @mutex.synchronize do
                @session.send_message(
                    (block_given?) ? Session::REQUEST_BLOCK : Session::REQUEST,
                    @object_id,
                    message)
                loop do
                    type, obj, message = @session.get_message()
                    case type
                        when Session::RETVAL     ; return msg_to_obj(message)
                        when Session::YIELD      ; yield msg_to_obj(message)
                        when Session::EXCEPTION  ; raise message
                        when Session::SYNC       ; @session.reply_sync(obj)
                        else                     ; raise "Invalid msg type"
                    end
                end
            end
        end

        # This is a special function that lets you ignore the return value
        # of the calling function to get some extra speed, in exchange for not
        # knowing when the function will complete.  A future version of the
        # server should probably check whether a given function is allowed to
        # be called as a oneway, in order to reduce DoS attacks.
        def oneway(*message)
            @mutex.synchronize do
                @session.send_message(Session::ONEWAY, @object_id, message)
                nil #return
            end
        end

        # sync() will synchonize the client with the server (useful for
        # determining when a oneway call completes)
        def sync()
            @mutex.synchronize do
                @session.send_sync()
                @session.wait_sync()
            end
        end

        # Make sure certain methods get passed down the wire.
        Functions::GOOD.each do |method|
            eval %{
                def #{method}(*args)
                      method_missing(:#{method}, *args) #return
                end
            }
        end

        # And make sure others never get called.
        Functions::BAD.each do |method|
            eval %{
                def #{method}(*args)
                    raise(NameError,
                        "undefined method `#{method}' for " +
                        "\#<#{self.class}:#{self.id}>")
                end
            }
        end

        # And remove these function names from any method lists that get
        # returned; there's nothing we can do about people who decide to
        # return them from other functions.
        Functions::METHOD.each do |method|
            eval %{
                def #{method}(*args)
                    retval = method_missing(:#{method}, *args)
                    retval.each do |item|
                        Functions::BAD.each do |bad|
                            retval.delete(bad.to_s)
                        end
                    end
                    retval #return
                end
            }
        end

        # Same here, except don't let the call go through in the first place.
        Functions::RESPOND.each do |method, action|
            eval %{
                def #{method}(arg, *args)
                    Functions::BAD.each do |bad|
                        if arg === bad.to_s then
                            return eval("#{action}")
                        end
                    end
                    method_missing(:#{method}, arg, *args) #return
                end
            }
        end

    private

        def msg_to_obj(obj)
            if Object_Reference === obj then
                Proxy_Object.new(@session, @mutex, obj.object_id) #return
            else
                obj #return
            end
        end

    end

    # A DRuby session sends messages back and forth between server and client.
    # The message format is as follows:
    # +-----+-----+-----+-----+-----+-----+-----+-----+-----+---  +  ---+-----+
    # | MSG_START | msg. size | msg. type |  obj. id  |  marshalled msg/args  |
    # +-----+-----+-----+-----+-----+-----+-----+-----+-----+---  +  ---+-----+
    class Session
        REQUEST         = 0x1001
        REQUEST_BLOCK   = 0x1002
        ONEWAY          = 0x1003
        RETVAL          = 0x2001
        EXCEPTION       = 0x2002
        YIELD           = 0x2003
        RESOLVE         = 0x3001
        SYNC            = 0x4001
        MSG_START       = 0x4242

        MAX_ID = 2**16

        attr_reader :io
        alias_method :socket, :io

        def initialize(io)
            @io = io
        end

        def send_message(type, obj, message)
            data = Marshal.dump(message)
            header = [MSG_START, data.length, type, obj]
            @io.write(header.pack("vvvv"))
            @io.write(data)
        end

        def get_message()
            header = @io.read(8)
            magic, size, type, obj = header.unpack("vvvv")
            while magic != MSG_START
                header = @io.read(8)
                magic, size, type, obj = header.unpack("vvvv")
            end
            data = @io.read(size)
            message = Marshal.load(data)
            [ type, obj, message ] #return
        end

        def finished()
            @io.eof #return
        end
        
        def send_sync()
            send_message(Session::SYNC, 0, nil)
        end
        
        def wait_sync()
            sleep 1
            type, obj, message = get_message()
            if type != Session::SYNC && obj != 0 && message != nil then
                raise "DRuby synchonization failed"
            end
        end

        def reply_sync(value)
            if value == 0 then
                send_message(Session::SYNC, 1, nil)
            end
        end

    end
end
