I have a queue that contains work items and I want to have multiple threads work in parallel on those items. When a work item is processed it may result in new work items. The problem I have is that I can't find a solution on how to determine if I'm done. The worker looks like that:
public class Worker implements Runnable {
public void run() {
while (true) 开发者_运维问答{
WorkItem item = queue.nextItem();
if (item != null) {
processItem(item);
}
else {
// the queue is empty, but there may still be other workers
// processing items which may result in new work items
// how to determine if the work is completely done?
}
}
}
}
This seems like a pretty simple problem actually but I'm at a loss. What would be the best way to implement that?
thanks
clarification: The worker threads have to terminate once none of them is processing an item, but as long as at least one of them is still working they have to wait because it may result in new work items.
What about using an ExecutorService which will allow you to wait for all tasks to finish: ExecutorService, how to wait for all tasks to finish
I'd suggest wait/notify calls. In the else case, your worker threads would wait on an object until notified by the queue that there is more work to do. When a worker creates a new item, it adds it to the queue, and the queue calls notify on the object the workers are waiting on. One of them will wake up to consume the new item.
The methods wait, notify, and notifyAll of class Object support an efficient transfer of control from one thread to another. Rather than simply "spinning" (repeatedly locking and unlocking an object to see whether some internal state has changed), which consumes computational effort, a thread can suspend itself using wait until such time as another thread awakens it using notify. This is especially appropriate in situations where threads have a producer-consumer relationship (actively cooperating on a common goal) rather than a mutual exclusion relationship (trying to avoid conflicts while sharing a common resource).
Source: Threads and Locks
I'd look at something higher level than wait/notify. It's very difficult to get right and avoid deadlocks. Have you looked at java.util.concurrent.CompletionService<V>
? You could have a simpler manager thread that polls the service and take()
s the results, which may or may not contain a new work item.
Using a BlockingQueue containing items to process along with a synchronized set that keeps track of all elements being processed currently:
BlockingQueue<WorkItem> bQueue;
Set<WorkItem> beingProcessed = new Collections.synchronizedSet(new HashMap<WorkItem>());
bQueue.put(workItem);
...
// the following runs over many threads in parallel
while (!(bQueue.isEmpty() && beingProcessed.isEmpty())) {
WorkItem currentItem = bQueue.poll(50L, TimeUnit.MILLISECONDS); // null for empty queue
if (currentItem != null) {
beingProcessed.add(currentItem);
processItem(currentItem); // possibly bQueue.add(newItem) is called from processItem
beingProcessed.remove(currentItem);
}
}
EDIT: as @Hovercraft Full Of Eels suggested, an ExecutorService
is probably what you should really use. You can add new tasks as you go along. You can semi-busy wait for termination of all tasks at regular interval with executorService.awaitTermination(time, timeUnits)
and kill all your threads after that.
Here's the beginnings of a queue to solve your problem. bascially, you need to track new work and in process work.
public class WorkQueue<T> {
private final List<T> _newWork = new LinkedList<T>();
private int _inProcessWork;
public synchronized void addWork(T work) {
_newWork.add(work);
notifyAll();
}
public synchronized T startWork() throws InterruptedException {
while(_newWork.isEmpty() && (_inProcessWork > 0)) {
wait();
if(!_newWork.isEmpty()) {
_inProcessWork++;
return _newWork.remove(0);
}
}
// everything is done
return null;
}
public synchronized void finishWork() {
_inProcessWork--;
if((_inProcessWork == 0) && _newWork.isEmpty()) {
notifyAll();
}
}
}
your workers will look roughly like:
public class Worker {
private final WorkQueue<T> _queue;
public void run() {
T work = null;
while((work = _queue.startWork()) != null) {
try {
// do work here...
} finally {
_queue.finishWork();
}
}
}
}
the one trick is that you need to add the first work item _before you start any workers (otherwise they will all immediately exit).
精彩评论