My apologies for the long-ish post up front. Hopefully it'll give enough context for a solution. I've tried to create a utility function that will take any number of old classmethod
s and stick them into a multi-threaded queue:
class QueuedCall(threading.Thread):
def __init__(self, name, queue, fn, args, cb):
threading.Thread.__init__(self)
self.name = name
self._cb = cb
self._fn = fn
self._queue = queue
self._args = args
self.daemon = True
self.start()
def run(self):
r = self._fn(*self._args) if self._args is not None \
else self._fn()
if self._cb is not None:
self._cb(self.name, r)
self._queue.task_done()
Here's what my calling code looks like (within a class)
data = {}
def __op_complete(name, r):
data[name] = r
q = Queue.Queue()
socket.setdefaulttimeout(5)
q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete))
q.put(QueuedCall('so_answers', q, StackExchange.get_answers,
['api.stackoverflow.com', 534476, 5], __op_complete))
q.put(QueuedCall('so_user', q, StackExchange.get_user_info,
['api.stackoverflow.com', 534476], __op_complete))
q.put(QueuedCall('p_answers', q, StackExchange.get_answers,
['api.programmers.stackexchange.com', 23901, 5], __op_complete))
q.put(QueuedCall('p_user', q, StackExchange.get_user_info,
['api.programmers.stackexchange.com', 23901], __op_complete))
q.put(QueuedCall('fb_image', q, Facebook.get_latest_picture, None, __op_complete))
q.join()
return data
The problem that I'm running into here is that it seems to work every time on a fresh server restart, but fails every second or third request, with the error:
ValueError: task_done() called too many times
This error presents itself in a random thread every second or third request, so it's rather difficult to nail down exactly what the problem is.
Anyone have any ideas and/or suggestions?
Thanks.
Edit:
I had added print
s in an effort to debug this (quick and dirty rather than logging). One print statement (print 'running thread: %s' % self.name
) in the first line of run
and another right before calling task_done()
(print 'thread done: %s' % self.name
).
The output of a successful request:
running thread: twitter
running thread: so_answers
running thread: so_user
running thread: p_answers
thread done: twitter
thread done: so_user
running thread: p_user
thread done: so_answers
running thread: fb_image
thread done: p_answers
thread done: p_user
thread done: fb_image
The output of an unsuccessful request:
running thread: twitter
running thread: so_answers
thread done: twitter
thread done: so_answers
running thread: so_user
thread done: so_user
running thread: p_answers
thread done: p_answers
Exception in thread p_answers:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/home/demian/src/www/projects/demianbrecht/demianbrecht/demianbrecht/helpers.py", line 37, in run
self._queue.task_done()
File "/usr/lib/python2.7/Queue.py", line 64, in task_done
raise ValueError('task_done() called too many t开发者_运维百科imes')
ValueError: task_done() called too many times
running thread: p_user
thread done: p_user
running thread: fb_image
thread done: fb_image
Your approach to this problem is "unconventional". But ignoring that for now ... the issue is simply that in the code you have given
q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete))
it is clearly possible for the following workflow to occur
- A thread is constructed and started by QueuedCall.__init__
- It is then put into the queue q. However ... before the Queue completes its logic for inserting the item, the independent thread has already finished its work and attempted to call q.task_done(). Which causes the error you have (task_done() has been called before the object was safely put into the queue)
How it should be done? You don't insert threads into queues. Queues hold data that threads process. So instead you
- Create a Queue. Insert into it jobs you want done (as eg functions, the args they want and the callback)
- You create and start worker threads
- A worker thread calls
- q.get() to get the function to invoke
- invokes it
- calls q.task_done() to let the queue know the item was handled.
I may be misunderstanding here, but I'm not sure you're using the Queue
correctly.
From a brief survey of the docs, it looks like the idea is that you can use the put
method to put work into a Queue
, then another thread can call get
to get some work out of it, do the work, and then call task_done
when it has finished.
What your code appears to do is put
instances of QueuedCall
into a queue. Nothing ever get
s from the queue, but the QueuedCall
instances are also passed a reference to the queue they're being inserted into, and they do their work (which they know about intrinsically, not because they get
it from the queue) and then call task_done
.
If my reading of all that is correct (and you don't call the get
method from somewhere else I can't see), then I believe I understand the problem.
The issue is that the QueuedCall
instances have to be created before they can be put on the queue, and the act of creating one starts its work in another thread. If the thread finishes its work and calls task_done
before the main thread has managed to put
the QueuedCall
into the queue, then you can get the error you see.
I think it only works when you run it the first time by accident. The GIL 'helps' you a lot; it's not very likely that the QueuedCall
thread will actually gain the GIL and begin running immediately. The fact that you don't actually care about the Queue other than as a counter also 'helps' this appear to work: it doesn't matter if the QueuedCall
hasn't hit the queue yet so long as it's not empty (this QueuedCall
can just task_done
another element in the queue, and by the time that element calls task_done
this one will hopefully be in the queue, and it can be marked as done by that). And adding sleep
also makes the new threads wait a bit, giving the main thread time to make sure they're actually in the queue, which is why that masks the problem as well.
Also note that, as far as I can tell from some quick fiddling with an interactive shell, your queue is actually still full at the end, because you never actually get
anything out of it. It's just received a number of task_done
messages equal to the number of things that were put
in it, so the join
works.
I think you'll need to radically redesign the way your QueuedCall
class works, or use a different synchronisation primitive than a Queue
. A Queue
is designed to be used to queue work for worker threads that already exist. Starting a thread from within a constructor for an object that you put on the queue isn't really a good fit.
精彩评论