I am currently writing a nginx proxy server module with a Request queue in front, so the requests are not dropped when the servers behind the nginx can't handle the requests (nginx is configured as a load balancer).
I am using
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
The idea is to put the request in a queue before handling them. I know multiprocessing.Queue supports only simple object and cannot support raw sockets, so I tried using a multiprocess.Manager to make a shared dictionary. The Manager also uses sockets for connection, so this method failed too. Is there a way to share network sockets between processes? Here is the problematic part of the code:
class ProxyServer(Threader, HTTPServer):
def __init__(self, server_address, bind_and_activate=True):
HTTPServer.__init__(self, server_address, ProxyHandler,
bind_and_activate)
self.manager = multiprocessing.Manager()
self.conn_dict = self.manager.dict()
self.ticket_queue = multiprocessing.Queue(maxsize= 10)
self._processes = []
self.add_worker(5)
def process_request(self, request, client):
stamp = time.time()
print "We are processing"
self.conn_dict[stamp] = (request, client) # the program crashes here
#Exception happened during processing of request from ('172.28.192.34', 49294)
#Traceback (most recent call last):
# File "/usr/lib64/python2.6/SocketServer.py", line 281, in _handle_request_noblock
# self.process_request(request, client_address)
# File "./nxproxy.py", line 157, in process_request
# self.conn_dict[stamp] = (request, client)
# File "<string>", line 2, in __setitem__
# File "/usr/lib64/python2.6/multiprocessing/managers.py", line 725, in _callmethod
# conn.send((self._id, methodname, args, kwds))
#TypeError: expected string or Unicode object, NoneType found
self.ticket_queue.put(stamp)
def add_worker(self, number_of_workers):
开发者_如何学C for worker in range(number_of_workers):
print "Starting worker %d" % worker
proc = multiprocessing.Process(target=self._worker, args = (self.conn_dict,))
self._processes.append(proc)
proc.start()
def _worker(self, conn_dict):
while 1:
ticket = self.ticket_queue.get()
print conn_dict
a=0
while a==0:
try:
request, client = conn_dict[ticket]
a=1
except Exception:
pass
print "We are threading!"
self.threader(request, client)
U can use multiprocessing.reduction to transfer the connection and socket objects between processes
Example Code
# Main process
from multiprocessing.reduction import reduce_handle
h = reduce_handle(client_socket.fileno())
pipe_to_worker.send(h)
# Worker process
from multiprocessing.reduction import rebuild_handle
h = pipe.recv()
fd = rebuild_handle(h)
client_socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
client_socket.send("hello from the worker process\r\n")
Looks like you need to pass file descriptors between processes (assuming Unix here, no clue about Windows). I've never done this in Python, but here is link to python-passfd project that you might want to check.
You can look at this code - https://gist.github.com/sunilmallya/4662837 which is multiprocessing.reduction socket server with parent processing passing connections to client after accepting connections
精彩评论