Right now, I have a queue, with multiple producers and single consumer.
Consumer thread开发者_StackOverflow中文版 operation is slow. Also, consumer takes element from queue through a peek operation, and until the consumption operation is complete, the element cannot be removed from the queue. This is because the producer thread as a side operation also takes a snapshot of all elements that are not fully processed at that point in time.
Now, I want to change my code to support multiple consumers. So, lets say I have three threads, one thread will take the first element, which can be read through a peek operation. The second consumer thread can go for the second element, but I have no way of retrieving that since queue dosn't support retrieving the second element.
So, the option to use a standard ConcurrentLinkedQueue (which I am using right now) is out.
I am thinking of using a priority queue, but then I will have to associate with each element a flag that tells me if this element is already being used by some thread or not.
Which data structure is most suited to this problem?
It sounds like you should really have two queues:
- Unprocessed
- In progress
A consumer would atomically (via a lock) pull from the unprocessed queue and add to the in-progress queue. That way multiple consumers can work concurrently... but the producer can still take a snapshot of both queues when it needs to. When the consumer is finished with a task, it removes it from the in-progress queue. (That doesn't really need to be a queue, as nothing's "pulling" from it as such. Just some collection you can easily add to and remove from.)
Given that you'll need locking to make the transfer atomic, you probably don't need the underlying queues to be the concurrent ones - you'll be guarding all shared access already.
I'd agree with Jon Skeet (+1) in that you need two stores for recording waiting and in-progress items. I would use a LinkedBlockingQueue
and have each of your consumers call take()
on it. When an element arrives on the queue it will be taken by one of the consumers.
Recording what is in progress and what is completed would be separate operation. I would maintain a HashSet
of all the items that have not yet completed and my producer would first (atomically) add the item to the HashSet of non-completed items and then pop the item on the queue. Once a consumer have finished it's work, it removes the item from the HashSet.
Your producer can scan the HashSet to determine what is outstanding.
精彩评论