I am implementing a small thrift (0.6.0) server in ruby to play a role of proxy to another protocol with several connections (multiple clients) to a single server. I want to be able and keep per-client data on the server side and track "session" parameters across multiple invocations of handler functions.
I currently use Thrift::NonblockingServer
as SimpleServer
does not seem to allow concurrent connections.
I know how to use TCPSocket::peeraddr
but Thrift::NonblockingServer::IOManager::Worker::run
creates a temporary MemoryBufferTransport
with the frame it read and passes that as the input/output protocol down to the processor so it seems that the info is not passed down from there.
Is there a clean way to do this?
I was thinking of re-defining the above mentioned Thrift::NonblockingServer::IOManager::Worker::run
to开发者_运维问答 also include the fd, or other ID at an additional parameter to process or augment the proto instances but as I also have to worry about one layer of generated ruby code (process_*
methods in class Processor
) it seems a little heavy.
I wonder if someone did anything like this before.
Thanks!
p.s. this is similar problem to this C++ thrift question
Here is how I went about changing Thrift::NonblockingServer::IOManager::Worker::run
to support this.
Few caveats:
- As I mentioned in the question I don't consider it clean (if nothing else I will have to monitor future thrift versions for changes in this function, this is based on 0.6.0).
- this is written to work with ruby 1.8 (or I would have gotten the non translated addr/port see my other question)
- I'm a ruby newbie .. I'm sure I've done some of this "wrong" (for example, should
$connections
be@@connections
inConnEntry
or a diff class?). - I am aware that the
Thread.current
hash for thread-local storage has a namespace pollution issue
First, at some central module:
module MyThriftExt
$connections={}
class ConnEntry
attr_reader :addr_info, :attr
def initialize(thrift_fd)
@addr_info=thrift_fd.handle.peeraddr
@attr={}
end
def self.PreHandle(fd)
$connections[fd]=ConnEntry.new(fd) unless $connections[fd].is_a? ConnEntry
# make the connection entry as short-term thread-local variable
# (cleared in postHandle)
Thread.current[:connEntry]=$connections[fd]
end
def self.PostHandle()
Thread.current[:connEntry]=nil
end
def to_s()
"#{addr_info}"
end end end
module Thrift class NonblockingServer
class IOManager
alias :old_remove_connection :remove_connection
def remove_connection(fd)
$connections.delete fd
old_remove_connection(fd)
end
class Worker
# The following is verbatim from thrift 0.6.0 except for the two lines
# marked with "Added"
def run
loop do
cmd, *args = @queue.pop
case cmd
when :shutdown
@logger.debug "#{self} is shutting down, goodbye"
break
when :frame
fd, frame = args
begin
otrans = @transport_factory.get_transport(fd)
oprot = @protocol_factory.get_protocol(otrans)
membuf = MemoryBufferTransport.new(frame)
itrans = @transport_factory.get_transport(membuf)
iprot = @protocol_factory.get_protocol(itrans)
MyThriftExt::ConnEntry.PreHandle(fd) # <<== Added
@processor.process(iprot, oprot)
MyThriftExt::ConnEntry.PostHandle # <<== Added
rescue => e
@logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}"
end
end
end
end
end
end
end
end
Then at any point in the handler you can access Thread.current[:connEntry].addr_info
for connection specific data or store anything regarding the connection in the Thread.current[:connEntry].attr
hash.
精彩评论