I have a problem with the following code fragment. It's intended to handle events (provided via calls on processEvent method) which are added to an event queue (ConcurrentLinkedQueue). Events are added to an event queue and processed periodically in the run method.
All is fine almost always. But sometimes after a call on the processEvent method, when an event is added to the queue, the run part fails to see there is a new event.
Any idea on what is wrong? Besides the obvious mistake in using a String constant as a lock?
import java.util.concurrent.ConcurrentLinkedQueue;
public class MyCommunicator implements Runnable {
private ConcurrentLinkedQueue<MyEvent> eventQueue = null;
private boolean stopped = false;
private String lock = "";
private Thread thread = null;
public MyCommunicator() {
eventQueue = new ConcurrentLinkedQueue<MyEvent>();
}
public void start() {
thread = new Thread(this, "MyCommunicatorThread");
thread.start();
}
public void stop() {
stopped = true;
synchronized (lock) {
lock.notifyAll();
}
eventQueue.clear();
}
public void run() {
while (!stopped) {
try {
MyEvent event = null;
while (!stopped && ((event = eventQueue.peek()) != null)) {
sendEvent(event);
eventQueue.poll();
}
if (!stopped) {
开发者_运维知识库 synchronized (lock) {
lock.wait(10000L);
}
}
}
catch (Exception e) {
}
}
}
/**
* START EVENT JOB - ADD A NEW EVENT TO BE PROCESSED
*/
public void processEvent(MyEvent event) {
eventQueue.offer(event);
synchronized (lock) {
lock.notifyAll();
}
}
/**
* END EVENT JOB
*/
private void sendEvent(MyEvent event) {
// do send event job
}
}
Why are you using locks and notifications?
Use a LinkedBlockingQueue instead and save yourself all the hassle.
That with a timeout on the poll()
will accomplish everything you're trying to do.
Edit: In regard to the current code;
You would need to define "fails to see there is a new event". Your run()
method looks at the queue every 10 seconds; if there's something in the queue it'll "see it" and pull it out.
If you mean "It doesn't see it immediately when notified, only 10 seconds later", then that's fairly easy to answer as you have a race condition which could easily cause that to occur. Something can be inserted into the queue while this thread is between when it has finished checking/processing the queue and acquiring the lock. Without a timeout on
wait()
you would block until the next event was inserted. If thestop()
method was calling during this time, you'd lose any events in the queue. Using theLinkedBlockingQueue
rather than all the unnecessary locking and notifying solves this problem. This isn't an "easy" solution, it's the correct one for this use case and problem.If that's not the case, then you're simply not inserting anything into the queue and the problem lies in code you didn't post here. A guess without knowing anything about that code would be that you're attempting to insert a null
MyEvent
ateventQueue.offer(event)
. Since you aren't try/catch'ingoffer()
you wouldn't know it. Ignoring all exceptions and not checking returned values is neither a good idea or practice.A third possibility would be that you have some other code somewhere locking on the same exact interned string literal reference which would cause this code to hang. You mention it but I'll reiterate here - that's a REALLY bad thing to be doing, especially given that it's the empty string. The
java.util.concurrent
package provides real locks with conditions if you insist on using them here. Note that this will still not eliminate the race condition you have in regard to sometimes missing an event for 10 seconds, but it'll at least be cleaner. To eliminate your race condition you'd want to ditch the concurrent queue for a regular one and simply acquire the lock before accessing it (as well as acquiring the lock for inserts). This will synchronize your threads as an inserter will be prevented from inserting unless this thread is waiting on the lock condition. Mixing lock and lock-free approaches to thread synchronization in the same chunk of code will often lead to these issues.
You have what's known as a missed signal. You poll the queue and then wait on the monitor (taking the lock). The producer threads add events and then call notifyAll()
(taking the lock). There is no happens-before
relationship between event queuing/poll and the conditional await/notification.
It is therefore possible for thread A to poll while empty and then try to acquire the lock, meanwhile thread B adds an element and acquires the lock, notifying all awaiting threads then releasing the lock. Thread A then acquires the lock and awaits it, but the signal has been missed.
As you are using the lock purely for signalling, you might consider another mechanism such as a reusable latch like Doug Lea's new jdk7 Phaser, or just use a BlockingQueue
directly.
Alternatively we have a couple of ReusableLatch such as a BooleanLatch for a single reader thread or a PhasedLatch for multi-party support.
No particular idea on first glance, but any number of things could be going wrong without your knowledge, because of this:
catch (Exception e) {
}
A handler that catches any Exception
(which include RuntimeException
and its various subclasses) and then ignores it is generally a bad idea. If this is meant to catch a specific type of exception (like, say, the InterruptedException
that can probably be thrown by lock.wait()
), then you should limit it to that exception type. If you have some reason for catching any exception, then you should at least log something when an exception occurs.
An issue I had with ConcurrentLinkedQueue
that i really suspect to be a genuine bug, in that it's not really full proof synchronized.
I haven't fully tested this yet, but i looked at the code and I am quite sure that .isEmpty() is not synchronized if the queue actually is empty. While one thread invokes .isEmpty() and true
is returned, the queue might already contain elements.
精彩评论