require 'socket'
require 'thread'

=begin
DRuby (not to be confused with dRuby, written by Masatoshi SEKI)
(C) Copyright 2001 Paul Brannan (cout at rm-f.net)

DRuby is a set of classes for providing distributed object support to a
Ruby program.  You may distribute and/or modify it under the same terms as
Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).  Example:

Client 
------
client = DRuby::Client.new('localhost', 4242)
obj = client.resolve("foo")
puts obj.foo("foo")
obj.oneway(:foo, "foo")

Server
------
class Foo
    def foo(x); return x; end
end
obj = Foo.new
server = DRuby::Server.new('localhost', 4242)
server.register(obj, "foo")
server.thread.join

You can do all sorts of cool things with DRuby, including passing blocks to
the functions, throwing exceptions and propogating them from server to
client, and more.  Unlike CORBA, where you must create an interface
definition and strictly adhere to it, DRuby uses marshalling, so you can
use almost any object with it.  But, alas, it is not as powerful as CORBA.

On a fast machine, you should expect around 4000 messages per second with
normal method calls, and up to 10000 messages per second with oneway calls.
My dual-processor machine gets 3500/6500 messages per second, though, so
YMMV.

The DRuby message format is broken into 3 components:
    [ msg_type, obj_id, message ]
For each msg_type, there is a specific format to the message.  Additionally,
certain msg_types are only valid when being sent to the server, and others
are valid only when being sent back to the client.  Here is a summary:

msg_type        send to     meaning of obj_id       msg format
----------------------------------------------------------------------------
REQUEST         server      obj to talk to          [:method, *args]
REQUEST_BLOCK   server      obj to talk to          [:method, *args]
ONEWAY          server      obj to talk to          [:method, *args]
RETVAL          client      always 0                retval
EXCEPTION       client      always 0                $!
YIELD           client      always 0                [value, value, ...]
RESOLVE         server      always 0                object_name
SYNC            either      0=request, 1=response   nil

Known bugs:
1) Oneway calls are really slow if mixed with regular calls.  I'm not sure
why.  A workaround is to use a separate connection for each type of call,
but sometimes that doesn't work either.
2) The server does not refuse connections for unauthorized addresses; it will
instead accept a connection and then immediately close it.
3) If the server is flooded with oneway calls and drops packets, it will
listen for data until it encounters a valid message.  If this happens, the
client will not be notified.
=end

module DRuby

public

    # The DRuby server class.  Like its drb equivalent, this class spawns off
    # a new thread which processes requests, allowing the server to do other
    # things while it is doing processing for a distributed object.  This
    # means, though, that all objects used with DRuby must be thread-safe.
    class Server
        attr_reader :host, :port, :obj, :thread

        # Start a server on the given host and port.  An acceptor may be
        # specified to accept/reject connections; it should be a proc that
        # takes a Socket as an argument and returns true or false.
        def initialize(host, port, acceptor=nil)
            @host = host
            @port = port
            @name_to_id = Hash.new
            @id_to_object = Hash.new
            @id_to_acceptor = Hash.new
            @mutex = Mutex.new
            @next_id = 1
            @thread = Thread.new do
                server = TCPServer.new(@host, @port)
                while(socket = server.accept)
                    if acceptor then
                        if !acceptor.call(socket) then
                            socket.close
                            next
                        end
                    end
                    socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
                    Thread.new(socket) do |socket|
                        session_loop(Session.new(socket))
                    end
                end
            end
        end

        # Register an object with the server.  Once an object is registered,
        # it cannot be unregistered.  The object will be given an id of
        # @next_id, and @next_id will be incremented.  We could use the
        # object's real id, but we want to be sure that the id will fit into
        # a 16-bit short.  The acceptor argument is a proc that takes a Socket
        # as an arugment, and returns true or false depending on whether the
        # request should be allowed.  The supplied object must be thread-safe.
        def register(obj, name, acceptor=nil)
            @mutex.synchronize do
                if @next_id >= Session::MAX_ID then
                    raise ArgumentError, "Object limit exceeded"
                end
                @name_to_id[name] = @next_id
                @id_to_acceptor[@next_id] = acceptor
                @id_to_object[@next_id] = obj
                @next_id = @next_id.succ()
            end
        end

        # Main server loop.  Wait for a REQUEST message, process it, and send
        # back a YIELD, EXCEPTION, or RETVAL message.  Note that we don't do
        # any thread synchronization here, because all registered objects are
        # required to be thread-safe, and the @id_to_object lookup is atomic
        # (and if it succeeds, the object is guaranteed to be fully
        # registered).
        def session_loop(session)
            while not session.finished()
                type, object_id, message = session.get_message()
                obj = @id_to_object[object_id]
                acceptor = @id_to_acceptor[object_id]
                begin
                    if acceptor then
                        if !acceptor.call(session.socket) then
                            if type != Session::ONEWAY then
                                raise ArgumentError, "No authorization"
                            end
                        end
                    end
                    case type
                        when Session::REQUEST
                            retval = obj.__send__(*message)
                        when Session::REQUEST_BLOCK
                            retval = obj.__send__(*message) do |*i|
                                session.send_message(Session::YIELD, 0, i)
                            end
                        when Session::ONEWAY
                            begin; obj.__send__(*message); rescue Exception; end
                            next
                        when Session::RESOLVE
                            retval = @name_to_id[message]
                        when Session::SYNC
                            session.reply_sync(object_id)
                            next
                        else
                            raise ArgumentError, "Bad session request"
                    end
                    session.send_message(Session::RETVAL, 0, retval)
                rescue Exception
                    session.send_message(Session::EXCEPTION, 0, $!)
                end
            end
        end
    end

    # The DRuby client class.  A DRuby server must be started on the given
    # host and port before instantiating a DRuby client.
    class Client
        attr_reader :host, :port
        
        # Connect a client to the server on the given host and port.  The
        # user can specify sync=false to turn off synchronization and get a
        # 20% speed boost.
        def initialize(host, port, sync=true)
            @host = host
            @port = port
            @server = TCPSocket.open(@host, @port)
            @server.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
            @session = Session.new(@server)
            @mutex = sync ? Mutex.new : Null_Mutex.new
        end

        # Given a string, return a proxy object that will forward requests
        # for an object on the server with that name.
        def resolve(object_name)
            @mutex.synchronize do
                @session.send_message(Session::RESOLVE, 0, object_name)
                type, obj, message = @session.get_message()
                case type
                    when Session::RETVAL
                        return Object.new(@session, @mutex, message)
                    when Session::EXCEPTION     ; raise message
                    else                        ; raise RuntimeError
                end
            end
        end
    end

private

    # In case the user does not want synchronization.
    class Null_Mutex
        def synchronize
            yield
        end
    end

    # A DRuby::Object acts as a proxy; it forwards most methods to the server
    # for execution.
    class Object

        def initialize(session, mutex, object_id)
            @object_id = object_id
            @session = session
            @mutex = mutex
        end

        # Client request handler.  The idea here is to send out a REQUEST
        # message, and get back a YIELD, EXCEPTION, or RETVAL message.
        def method_missing(method, *args)
            message = [ method, *args ]
            @mutex.synchronize do
                @session.send_message(
                    (block_given?) ? Session::REQUEST_BLOCK : Session::REQUEST,
                    @object_id,
                    message)
                loop do
                    type, obj, message = @session.get_message()
                    case type
                        when Session::RETVAL     ; return message
                        when Session::YIELD      ; yield message
                        when Session::EXCEPTION  ; raise message
                        when Session::SYNC       ; @session.reply_sync(obj)
                        else                     ; raise RuntimeError
                    end
                end
            end
        end

        # This is a special function that lets you ignore the return value
        # of the calling function to get some extra speed, in exchange for not
        # knowing when the function will complete.  A future version of the
        # server should probably check whether a given function is allowed to
        # be called as a oneway, in order to reduce DoS attacks.
        def oneway(*message)
            @mutex.synchronize do
                @session.send_message(Session::ONEWAY, @object_id, message)
                return nil
            end
        end

        # sync() will synchonize the client with the server (useful for
        # determining when a oneway call completes)
        def sync()
            @mutex.synchronize do
                @session.send_sync()
                @session.wait_sync()
            end
        end

        GOOD_FUNCTIONS = [
            :inspect, :class_variables, :instance_eval, :instance_variables,
            :to_a, :to_s
        ]

        BAD_FUNCTIONS = [
            :clone, :dup, :display
        ]

        METHOD_FUNCTIONS = [
            :methods, :private_methods, :protected_methods, :public_methods,
            :singleton_methods
        ]

        RESPOND_FUNCTIONS = [
            [ :method,      "raise NameError" ],
            [ :respond_to?, "return false" ]
        ]

        # Make sure certain methods get passed down the wire.
        GOOD_FUNCTIONS.each do |method|
            eval %{
                def #{method}(*args)
                      return method_missing(:#{method}, *args)
                end
            }
        end

        # And make sure others never get called.
        BAD_FUNCTIONS.each do |method|
            undef_method method
        end

        # And remove these function names from any method lists that get
        # returned; there's nothing we can do about people who decide to
        # return them from other functions.
        METHOD_FUNCTIONS.each do |method|
            eval %{
                def #{method}(*args)
                    retval = method_missing(:#{method}, *args)
                    retval.each do |item|
                        BAD_FUNCTIONS.each do |bad|
                            retval.delete(bad.to_s)
                        end
                    end
                    return retval
                end
            }
        end

        # Same here, except don't let the call go through in the first place.
        RESPOND_FUNCTIONS.each do |method, action|
            eval %{
                def #{method}(arg, *args)
                    BAD_FUNCTIONS.each do |bad|
                        if arg === bad.to_s then
                            eval("#{action}")
                        end
                    end
                    return method_missing(:#{method}, arg, *args)
                end
            }
        end
    end

    # A DRuby session sends messages back and forth between server and client.
    # The message format is as follows:
    # +-----+-----+-----+-----+-----+-----+-----+-----+-----+---  +  ---+-----+
    # | MSG_START | msg. size | msg. type |  obj. id  |  marshalled msg/args  |
    # +-----+-----+-----+-----+-----+-----+-----+-----+-----+---  +  ---+-----+
    class Session
        REQUEST         = 0x1001
        REQUEST_BLOCK   = 0x1002
        ONEWAY          = 0x1003
        RETVAL          = 0x2001
        EXCEPTION       = 0x2002
        YIELD           = 0x2003
        RESOLVE         = 0x3001
        SYNC            = 0x4001
        MSG_START       = 0x4242

        MAX_ID = 2**16

        attr_reader :io
        alias_method :socket, :io

        def initialize(io)
            @io = io
        end

        def send_message(type, obj, message)
            data = Marshal.dump(message)
            header = [MSG_START, data.length, type, obj]
            @io.write(header.pack("vvvv"))
            @io.write(data)
        end

        def get_message()
            header = @io.read(8)
            magic, size, type, obj = header.unpack("vvvv")
            while magic != MSG_START
                header = @io.read(8)
                magic, size, type, obj = header.unpack("vvvv")
            end
            data = @io.read(size)
            message = Marshal.load(data)
            return [ type, obj, message ]
        end

        def finished()
            return @io.eof
        end
        
        def send_sync()
            send_message(Session::SYNC, 0, nil)
        end
        
        def wait_sync()
            sleep 1
            type, obj, message = get_message()
            if type != Session::SYNC && obj != 0 && message != nil then
                raise RuntimeError, "DRuby synchonization failed"
            end
        end

        def reply_sync(value)
            if value == 0 then
                send_message(Session::SYNC, 1, nil)
            end
        end

    end

end
