开发者

Broken Pipe when Using Python Multiprocessing Managers (BaseManager/SyncManager) to Share Queue with Remote Machines

开发者 https://www.devze.com 2023-01-14 18:45 出处:网络
In the last month, we\'ve had a persistent problem with the Python 2.6.x multiprocessing package when we\'ve tried to use it to share a queue among several different (linux) computers.I\'ve posed this

In the last month, we've had a persistent problem with the Python 2.6.x multiprocessing package when we've tried to use it to share a queue among several different (linux) computers. I've posed this question directly to Jesse Noller as well since we haven't yet found anything that elucidates the issue on StackOverflow, Python docs, source code or elsewhere online.

Our team of engineers hasn't been able to solve this one, and we've posed the question to quite a few people in python user groups to no avail. I was hoping someone could shed some insight, since I feel like we're doing something incorrect but are too close to the problem to see it for what it is.

Here's the symptom:

Traceback (most recent call last):
  File "/var/django_root/dev/com/brightscope/data/processes/daemons/deferredupdates/servers/queue_server.py", line 65, in get_from_queue
    return queue, queue.get(block=False)
  File "<string>", line 2, in get
  File "/usr/local/lib/python2.6/multiprocessing/managers.py", line 725, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

(I'm showing where our code calls queue.get() on a shared queue object, hosted by a manager that extends SyncManger).

What's peculiar about the issue is that if we connect to this shared queue on a single machine (let's call this machine A), even from lots of concurrent processes, we never seem to run into an issue. It's only when we connect to the queue (again, using a class that extends multiprocessing SyncManager and currently adds no additional functionality) from other machines (let's call these machines B and C) and run a high volume of items into and out of the queue at the same time that we experience a problem.

It is as though python's multiprocessing package handles local connections (even though they are still using the same manager.connect() connection method) in a manner that works from machine A but when remote connections are made simultaneously from at least one of machines B or C we get a Broken pipe error.

In all the reading my team has done, we thought the problem was related to locking. We thought maybe we shouldn't use Queue.Queue, but instead multiprocessing.Queue, but we switched and the problem persisted (we also noticed that SyncManager's own shared Queue is an instance of Queue.Queue).

We are pulling our hair out about how to even debug the issue, since it's hard to reproduce but does happen fairly frequently (many times per day if we are inserting and .get()ing lots of items from the queue).

The method we created get_from_queue attempts to retry acquiring the item from a queue ~10 times with randomized sleep intervals, but it seems like if it fails once, it will fail all ten times (which lead me to believe that .register() and .connect()ing to a manager perhaps doesn't give another socket connection to the server, but I couldn't confirm this either by reading the docs or looking at the Python internal source code).

Can anyone provide any insight into where we 开发者_JAVA百科might look or how we might track what's actually happening?

How can we start a new connection in the event of a broken pipe using multiprocessing.BaseManager or multiprocessing.SyncManager?

How can we prevent the broken pipe in the first place?


FYI In case anyone else runs by this same error, after extensive consulting with Ask Solem and Jesse Noller of Python's core dev team, it looks like this is actually a bug in current python 2.6.x (and possibly 2.7+ and possibly 3.x). They are looking at possible solutions and a fix will probably be included in a future version of Python.


I have suffered from the same problem, even if connecting on localhost in python 2.7.1. After a day of debugging i found the cause and a workaround:

Cause: BaseProxy class has thread local storage which caches the connection, which is reused for future connections causing "broken pipe" errors even on creating a new Manager

Workaround: Delete the cached connection before reconnecting. Add the code to a try-except clause on the line which raises the exception, and then retry it.

from multiprocessing.managers import BaseProxy

...

if address in BaseProxy._address_to_local:
    del BaseProxy._address_to_local[address][0].connection

The address is the hostname/ip used to connect to the multiprocessing Manager. If you have not explicitly set it, it should usually be "localhost"


Make sure the cpu memory is enough to support the program. I just increased the allocated memory and the broken pipe error was solved.


Also you can try to catch the exception in the child processes, so that it should not try to close the connection UN-expectedly. Same was happening to me and finally I had to suppress the errors so that the pipe should not get close abruptly.


I had the same problem in an interactive Jupyter notebook (Python 3.6.8) after interrupting a multiprocessing process.

My short term fix was reinstantiating the Manager and Namespace objects:

from multiprocessing import Manager

mgr = Manager()
ns = mgr.Namespace()

From the guide:

Avoid terminating processes

Using the Process.terminate method to stop a process is liable to cause any shared resources (such as locks, semaphores, pipes and queues) currently being used by the process to become broken or unavailable to other processes.

Therefore it is probably best to only consider using Process.terminate on processes which never use any shared resources.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号