A thread can use Object.wait()
to block until another thread calls notify()
or notifyAll()
on that object.
But what 开发者_开发问答if a thread wants to wait until one of multiple objects is signaled? For example, my thread must wait until either a) bytes become available to read from an InputStream
or b) an item is added to an ArrayList
.
How can the thread wait for either of these events to occur?
EDIT
This question deals with waiting for multiple threads to complete -- my case involves a thread waiting for one of many objects to be singnaled.
You are in for a world of pain. Use a higher level abstraction, such as a blocking message queue, from which the thread can consume messages such as 'more bytes available' or 'item added'.
They could all use the same mutex. You consumer is waiting on that mutex, the both other notify on that mutex when the first can proceed.
A thread cannot wait on more than one object at a time.
The wait()
and notify()
methods are object-specific. The wait()
method suspends the current thread of execution, and tells the object to keep track of the suspended thread. The notify()
method tells the object to wake up the suspended threads that it is currently keeping track of.
Useful link : Can a thread call wait() on two locks at once in Java (6) ?
Little late, but it's a very interesting question! It would seems that you can indeed wait for multiple conditions, with the same performance, and no extra threads; It's just a matter of defining the problem! I took the time to write a more detailed explanation within the commits of the code bellow. By request I will extract the abstraction:
So in fact waiting on multiple objects, is the same as waiting on multiple conditions. But the next step is to merge your sub-conditions into a -net- condition a -single condition-. And when any component of the condition would cause it to become true you flip a boolean, and notify the lock (like any other wait-notify condition).
My approach:
For any condition, it can only result in two values (true and false). How that value is produced is irrelevant. In your case your "functional condition" is when either one of two values is true: (value_a || value_b). I call this "functional condition" the "Nexus-Point". If you apply the perspective that any complex condition -no matter how complex-, always yields a simple result (true or false), then what you're really asking for is "What will cause my net condition to become true?" (Assuming the logic is "Wait until true"). Thus, when a thread causes a component of your condition to become true (setting value_a, or value_b to true, in your case), and you know it'll cause your desired -net- condition to be met, then you can simplify your approach to a classical ( in that it flips a single boolean-flag, and releases a lock). With this concept, you can apply a object-ordinate approach to help aid the clarity of your overall logic:
import java.util.HashSet;
import java.util.Set;
/**
* The concept is that all control flow operation converge
* to a single value: true or false. In the case of N
* components in which create the resulting value, the
* theory is the same. So I believe this is a matter of
* perspective and permitting 'simple complexity'. for example:
*
* given the statement:
* while(condition_a || condition_b || ...) { ... }
*
* you could think of it as:
* let C = the boolean -resulting- value of (condition_a || condition_b || ...),
* so C = (condition_a || condition_b || ...);
*
* Now if we were to we-write the statement, in lamest-terms:
* while(C) { ... }
*
* Now if you recognise this form, you'll notice its just the standard
* syntax for any control-flow statement?
*
* while(condition_is_not_met) {
* synchronized (lock_for_condition) {
* lock_for_condition.wait();
* }
* }
*
* So in theory, even if the said condition was evolved from some
* complex form, it should be treated as nothing more then if it
* was in the simplest form. So whenever a component of the condition,
* in which cause the net-condition (resulting value of the complex
* condition) to be met, you would simply flip the boolean and notify
* a lock to un-park whoever is waiting on it. Just like any standard
* fashion.
*
* So thinking ahead, if you were to think of your given condition as a
* function whos result is true or false, and takes the parameters of the states
* in which its comprised of ( f(...) = (state_a || state_b && state_c), for example )
* then you would recognize "If I enter this state, in which this I know would
* cause that condition/lock to become true, I should just flip the switch switch,
* and notify".
*
* So in your example, your 'functional condition' is:
* while(!state_a && !state_b) {
* wait until state a or state b is false ....
* }
*
* So armed with this mindset, using a simple/assertive form,
* you would recognize that the overall question:
* -> What would cause my condition to be true? : if state_a is true OR state_b is true
* Ok... So, that means: When state_a or state_b turn true, my overall condition is met!
* So... I can just simplify this thing:
*
* boolean net_condition = ...
* final Object lock = new Lock();
*
* void await() {
* synchronized(lock) {
* while(!net_condition) {
* lock.wait();
* }
* }
* }
*
* Almighty, so whenever I turn state_a true, I should just flip and notify
* the net_condition!
*
*
*
* Now for a more expanded form of the SAME THING, just more direct and clear:
*
* @author Jamie Meisch
*/
public class Main {
/**
*
* The equivalent if one was to "Wait for one of many condition/lock to
* be notify me when met" :
*
* synchronized(lock_a,lock_b,lock_c) {
* while(!condition_a || !condition_b || !condition_c) {
* condition_a.wait();
* condition_b.wait();
* condition_c.wait();
* }
* }
*
*/
public static void main(String... args) {
OrNexusLock lock = new OrNexusLock();
// The workers register themselves as their own variable as part of the overall condition,
// in which is defined by the OrNuxusLock custom-implement. Which will be true if any of
// the given variables are true
SpinningWarrior warrior_a = new SpinningWarrior(lock,1000,5);
SpinningWarrior warrior_b = new SpinningWarrior(lock,1000,20);
SpinningWarrior warrior_c = new SpinningWarrior(lock,1000,50);
new Thread(warrior_a).start();
new Thread(warrior_b).start();
new Thread(warrior_c).start();
// So... if any one of these guys reaches 1000, stop waiting:
// ^ As defined by our implement within the OrNexusLock
try {
System.out.println("Waiting for one of these guys to be done, or two, or all! does not matter, whoever comes first");
lock.await();
System.out.println("WIN: " + warrior_a.value() + ":" + warrior_b.value() + ":" + warrior_c.value());
} catch (InterruptedException ignored) {
}
}
// For those not using Java 8 :)
public interface Condition {
boolean value();
}
/**
* A variable in which the net locks 'condition function'
* uses to determine its overall -net- state.
*/
public static class Variable {
private final Object lock;
private final Condition con;
private Variable(Object lock, Condition con) {
this.lock = lock;
this.con = con;
}
public boolean value() {
return con.value();
}
//When the value of the condition changes, this should be called
public void valueChanged() {
synchronized (lock) {
lock.notifyAll();
}
}
}
/**
*
* The lock has a custom function in which it derives its resulting
* -overall- state (met, or not met). The form of the function does
* not matter, but it only has boolean variables to work from. The
* conditions are in their abstract form (a boolean value, how ever
* that sub-condition is met). It's important to retain the theory
* that complex conditions yeild a simple result. So expressing a
* complex statement such as ( field * 5 > 20 ) results in a simple
* true or false value condition/variable is what this approach is
* about. Also by centerializing the overal logic, its much more
* clear then the raw -simplest- form (listed above), and just
* as fast!
*/
public static abstract class NexusLock {
private final Object lock;
public NexusLock() {
lock = new Object();
}
//Any complex condition you can fathom!
//Plus I prefer it be consolidated into a nexus point,
// and not asserted by assertive wake-ups
protected abstract boolean stateFunction();
protected Variable newVariable(Condition condition) {
return new Variable(lock, condition);
}
//Wait for the overall condition to be met
public void await() throws InterruptedException {
synchronized (lock) {
while (!stateFunction()) {
lock.wait();
}
}
}
}
// A implement in which any variable must be true
public static class OrNexusLock extends NexusLock {
private final Set<Variable> vars = new HashSet<>();
public OrNexusLock() {
}
public Variable newVar(Condition con) {
Variable var = newVariable(con);
vars.add(var); //register it as a general component of or net condition // We should notify the thread since our functional-condition has changed/evolved:
synchronized (lock) { lock.notifyAll(); }
return var;
}
@Override
public boolean stateFunction() { //Our condition for this lock
// if any variable is true: if(var_a || var_b || var_c || ...)
for(Variable var : vars) {
if(var.value() == true) return true;
}
return false;
}
}
//increments a value with delay, the condition is met when the provided count is reached
private static class SpinningWarrior implements Runnable, Condition {
private final int count;
private final long delay;
private final Variable var;
private int tick = 0;
public SpinningWarrior(OrNexusLock lock, int count, long delay) {
this.var = lock.newVar(this);
this.count = count; //What to count to?
this.delay = delay;
}
@Override
public void run() {
while (state_value==false) { //We're still counting up!
tick++;
chkState();
try {
Thread.sleep(delay);
} catch (InterruptedException ignored) {
break;
}
}
}
/**
* Though redundant value-change-notification are OK,
* its best to prevent them. As such its made clear to
* that we will ever change state once.
*/
private boolean state_value = false;
private void chkState() {
if(state_value ==true) return;
if(tick >= count) {
state_value = true;
var.valueChanged(); //Our value has changed
}
}
@Override
public boolean value() {
return state_value; //We could compute our condition in here, but for example sake.
}
}
}
It appears that in your case you're waiting for "notifications" from two different sources. You may not have to "wait" (as in normal java synchronized(object) object.wait()
) on those two objects per se, but have them both talk to a queue or what not (as the other answers mention, some blocking collection like LinkedBlockingQueue).
If you really want to "wait" on two different java objects, you might be able to do so by applying some of the principles from this answer: https://stackoverflow.com/a/31885029/32453 (basically new up a thread each to do a wait on each of the objects you're waiting for, have them notify the main thread when the object itself is notified) but it might not be easy to manage the synchronized aspects.
Lock in both cases over the same object. Call in case a) or in case b) notify() on the same object.
You can wait only on one monitor. So notifiers must notify this one monitor. There is no other way in this low level synchronization.
In order handle the termination of any thread from a given set without waiting for all of them to finish, a dedicated common Object (lastExited
below) can be used as monitor (wait()
and notify()
in synchronized
blocks). Further monitors are required for ensuring that at any time at most one thread is exiting (notifyExitMutex
) and at most one thread is waiting for any thread to exit (waitAnyExitMonitor
); thus the wait()
/notify()
pairs pertain always to different blocks.
Example (all process terminations are handled in the order the threads finished):
import java.util.Random;
public class ThreadMonitor {
private final Runnable[] lastExited = { null };
private final Object notifyExitMutex = new Object();
public void startThread(final Runnable runnable) {
(new Thread(new Runnable() { public void run() {
try { runnable.run(); } catch (Throwable t) { }
synchronized (notifyExitMutex) {
synchronized (lastExited) {
while (true) {
try {
if (lastExited[0] != null) lastExited.wait();
lastExited[0] = runnable;
lastExited.notify();
return;
}
catch (InterruptedException e) { }
}
}
}
}})).start();
}
private final Object waitAnyExitMutex = new Object();
public Runnable waitAnyExit() throws InterruptedException {
synchronized (waitAnyExitMutex) {
synchronized (lastExited) {
if (lastExited[0] == null) lastExited.wait();
Runnable runnable = lastExited[0];
lastExited[0] = null;
lastExited.notify();
return runnable;
}
}
}
private static Random random = new Random();
public static void main(String[] args) throws InterruptedException {
ThreadMonitor threadMonitor = new ThreadMonitor();
int threadCount = 0;
while (threadCount != 100) {
Runnable runnable = new Runnable() { public void run() {
try { Thread.sleep(1000 + random.nextInt(100)); }
catch (InterruptedException e) { }
}};
threadMonitor.startThread(runnable);
System.err.println(runnable + " started");
threadCount++;
}
while (threadCount != 0) {
Runnable runnable = threadMonitor.waitAnyExit();
System.err.println(runnable + " exited");
threadCount--;
}
}
}
精彩评论