I have 3 threads: 2 consumers, ConsumerA
and ConsumerB
, and a Producer
.
I also have a LinkedBlockingQueue queue
At t=1: ConsumerA
calls queue.take()
At t=2: Consumer
B calls queue.take()
At t=3: Producer开发者_Go百科
calls queue.put(foo)
Is it guaranteed that ConsumerA receives foo before ConsumerB? In other words, the order in which the consumers invokes take()
is the order in which each is notified?
If not, is there an alternative data structure that will give higher priority based on order?
From looking at the source code, it's not guaranteed. There's a guarded block mechanism in place with the waking up of one thread at random, depending on how the scheduler feels.
notEmpty.signal(); // propagate to a non-interrupted thread
Full code: http://kickjava.com/src/java/util/concurrent/LinkedBlockingQueue.java.htm
Edit: just looked again at ReenterantLock and Condition, the threads are signalled in FIFO order, apparently. So, the first thread to wait for insertion will be signalled first. However, these are implementation details! Do not rely on them.
an implementation is not required to define exactly the same guarantees or semantics for all three forms of waiting, nor is it required to support interruption of the actual suspension of the thread
In many cases the order of threads entering the queue will be in appropriate order. However, the LinkedBlocking queue uses an unfair lock. What that does is allows threads to barge in on one another. Though it is rare the following could happen.
- Thread A enters and polls.
- Thread B enters tries to poll - Thread A holds the lock and parks Thread B.
- Thread A finishes and signals B
- Thread C enters and acquires the lock before B in unparked fully
- Thread B tries to poll - Thread C holds the lock and parks Thread B.
This is one possibility.
So digging through the bowels of the Java 6 source (skip the part between the horizontal rules if you don't care about finding the actual code responsible for this stuff)
The class
java.util.concurrent.LinkedBlockingQueue
implements mutex using instances of java.util.concurrent.locks.ReentrantLock
.
In turn, synchronization variables are generated using java.util.concurrent.locks.ReentrantLock.newCondition()
which calls java.util.concurrent.locks.ReentrantLock$Sync.newCondition()
.
The method java.util.concurrent.locks.ReentrantLock$Sync.newCondition()
returns an instance of java.util.concurrent.AbstractQueuedSynchronizer$ConditionObject
which implements the normal synchronization variable calls to await()
, signal()
and signalAll()
described by the interface java.util.concurrent.locks.Condiion
.
Looking at the source code for the ConditionObject
class, it keeps two members called firstWaiter
and lastWaiter
that are the first and last nodes in a CLH lock queue (instances of java.util.concurrent.locks.AbstractQueuedSynchronizer$Node
).
The documentation in that class states:
A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend. So the currently released contender thread may need to rewait.
So I believe the answer here is that the take()
method in LinkedBlockingQueue
attempts to give preferential treatment to those thread which called take()
earlier. It will give the first thread to call take()
the first chance to grab a queue item when it becomes available, but due to timeouts, interrupts, etc. that thread is not guaranteed to be the first thread to get the item off of the queue.
Keep in mind that this is completely specific to this particular implementation. In general you should assume that calls to take()
will wake up a random waiting thread when a queue item becomes available and not necessarily the first one that called take()
.
精彩评论