I have a queue of tasks that need to be performed, and a pool of workers that pick up the tasks and perform them. There's also a "manager" class that keeps track of the worker, allows the user to stop or restart them, reports on their progress, etc. Each worker does something like this:
public void doWork() {
checkArguments();
performCalculation();
saveResultsToDatabase();
performAnotherCalculation();
saveResultsToDatabase();
performYetAnotherCalculation();
saveResultsToDatabase();
}
In this case, "database" does not necessarily refer to an Oracle database. That's certainly one of the options, but the results could also be saved on disk, in Amazon SimpleDB, etc.
So far, so good. However, sometimes the performCalculation() code locks up intermittently, due to a variety of factors, but mostly due to a poor implementation of networking code in a bunch of third-party libraries (f.ex. Socket.read() never returns). This is bad, obviously, because the task is now stuck forever, and the worker is now dead.
What I'd like to do is wrap that entire doWork() method in some sort of a timeout, and, if the timeout expires, give the task to someone else.
How can I do that, though ? Let's say the original worker is stuck in the "performCalculation()" method. I then give the task to some other worker, who completes it, and then the original worker decides to wake up and save its intermediate results to the database... thus corrupting perfectly valid data. Is there some general pattern I can use to avoid this ?
I can see a couple of solutions, but most of t开发者_StackOverflow中文版hem will require some serious refactoring of all the business-logic code, from the ground up... which is probably the right thing to do philosophically, but is simply not something I have time for.
Have you tried using a Future
? They are useful for running a task and waiting for it to complete, using a timeout etc. For example:
private Runnable performCalc = new Runnable() {
public void run() {
performCalculation();
}
}
public void doWork() {
try {
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(performCalc).get(); // Timeouts can be used here.
executor.submit(anotherCalc).get();
} catch(InterruptedException e) {
// Asked to stop. Rollback out transactions.
} catch(OtherExceptions here) {
}
}
If performCalculation
stuck on blocking IO, there is little you can do to interrupt it. One solution is to close the underlying socket or set timeout on socket operations using Socket.setSoTimeout
, but you have to own the code which reads from the socket to do that.
Otherwise you can add some reconciliation mechanism before saving the data into the database. Use some kind of timestamps to detect if the data in the database is newer that the data which original worker fetched from the network.
I suppose the easiest thing to do would be to have a separate timer thread, started when the thread with performCalculation() starts. The timer thread can wake up after a period of time and Thread.interrupt()
the calculation thread, which can then perform any necessary rollback when handling the InterruptedException.
Granted, this is bolting on additional complexity to manage other problems, and consequently isn't the most elegant solution.
精彩评论