开发者

Python Sharing a network socket with multiprocessing.Manager

开发者 https://www.devze.com 2023-01-31 14:14 出处:网络
I am currently writing a nginx proxy server module with a Request queue in front, so the requests are not dropped when the servers behind the nginx can\'t handle the requests (nginx is configured as a

I am currently writing a nginx proxy server module with a Request queue in front, so the requests are not dropped when the servers behind the nginx can't handle the requests (nginx is configured as a load balancer).

I am using

from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler

The idea is to put the request in a queue before handling them. I know multiprocessing.Queue supports only simple object and cannot support raw sockets, so I tried using a multiprocess.Manager to make a shared dictionary. The Manager also uses sockets for connection, so this method failed too. Is there a way to share network sockets between processes? Here is the problematic part of the code:

class ProxyServer(Threader, HTTPServer):

    def __init__(self, server_address, bind_and_activate=True):
        HTTPServer.__init__(self, server_address, ProxyHandler,
                bind_and_activate)

        self.manager = multiprocessing.Manager()

        self.conn_dict = self.manager.dict()
        self.ticket_queue = multiprocessing.Queue(maxsize= 10)
        self._processes = []
        self.add_worker(5)


    def process_request(self, request, client):
        stamp = time.time()
        print "We are processing"

        self.conn_dict[stamp] = (request, client) # the program crashes here


    #Exception happened during processing of request from ('172.28.192.34', 49294)
    #Traceback (most recent call last):
    #  File "/usr/lib64/python2.6/SocketServer.py", line 281, in _handle_request_noblock
    #    self.process_request(request, client_address)
    #  File "./nxproxy.py", line 157, in process_request
    #    self.conn_dict[stamp] = (request, client)
    #  File "<string>", line 2, in __setitem__
    #  File "/usr/lib64/python2.6/multiprocessing/managers.py", line 725, in _callmethod
    #    conn.send((self._id, methodname, args, kwds))
    #TypeError: expected string or Unicode object, NoneType found

        self.ticket_queue.put(stamp)


    def add_worker(self, number_of_workers):
 开发者_如何学C       for worker in range(number_of_workers):
            print "Starting worker %d" % worker
            proc = multiprocessing.Process(target=self._worker, args = (self.conn_dict,))
            self._processes.append(proc)
            proc.start()

    def _worker(self, conn_dict):
        while 1:
            ticket = self.ticket_queue.get()

            print conn_dict
            a=0
            while a==0:
                try:
                    request, client = conn_dict[ticket]
                    a=1
                except Exception:
                    pass
            print "We are threading!"
            self.threader(request, client)


U can use multiprocessing.reduction to transfer the connection and socket objects between processes

Example Code

# Main process
from multiprocessing.reduction import reduce_handle
h = reduce_handle(client_socket.fileno())
pipe_to_worker.send(h)

# Worker process
from multiprocessing.reduction import rebuild_handle
h = pipe.recv()
fd = rebuild_handle(h)
client_socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
client_socket.send("hello from the worker process\r\n") 


Looks like you need to pass file descriptors between processes (assuming Unix here, no clue about Windows). I've never done this in Python, but here is link to python-passfd project that you might want to check.


You can look at this code - https://gist.github.com/sunilmallya/4662837 which is multiprocessing.reduction socket server with parent processing passing connections to client after accepting connections

0

精彩评论

暂无评论...
验证码 换一张
取 消