I have a simple web service running inside a Tomcat container, which by nature is multi-threaded. In each request that comes into the service, I want to make concurrent calls to an external service. The ExecutorCompletionService in java.util.concurrent gets me partly there. I can provide it a thread pool, and it will take care of executing my concurrent calls and I will be notified when any of the results are ready.
The code to process a particular incoming request might look like:
void han开发者_Go百科dleRequest(Integer[] input) {
// Submit tasks
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(Executors.newCachedThreadPool());
for (final Integer i : input) {
completionService.submit(new Callable<Integer>() {
public Integer call() {
return -1 * i;
}
});
}
// Do other stuff...
// Get task results
try {
for (int i = 0; i < input.size; i++) {
Future<Integer> future = completionService.take();
Integer result = future.get();
// Do something with the result...
}
} catch (Exception e) {
// Handle exception
}
}
This should work fine and dandy, but is quite inefficient since a new thread pool is being allocated for each incoming request. If I move the CompletionService out as a shared instance, I will run into thread-safety problems with multiple requests sharing the same CompletionService and thread pool. As requests submit tasks and get results, the results they get not be the ones they submitted.
Thus, what I need is a thread-safe CompletionService that allows me to share a common thread pool across all incoming requests. As each thread completes a task, the appropriate thread for the incoming request should be notified so that it can gather the results.
What's the most straightforward way to implement this sort of functionality? I'm sure this pattern has been applied many times; I'm just not sure if this is something provided by the Java concurrency library, or if can be easily built using some of the Java concurrency building blocks.
UPDATE: one caveat I forgot to mention is that I would like to be notified as soon as any of my submitted tasks complete. That's the primary advantage of using a CompletionService, as it decouples the production and consumption of the tasks and results. I don't actually care about the order in which I get the results back, and I'd like to avoid unnecessarily blocking while waiting for the results to be returned in order.
You share the Executor
but not the CompletionService
.
We have an AsyncCompleter that does exactly this and handles all the bookkeeping, allowing you to:
Iterable<Callable<A>> jobs = jobs();
Iterable<A> results async.invokeAll(jobs);
results
iterates in order of return and blocks until a result is available
java.util.concurrent provides everything you need. If I understand your question correctly, you have the following requirements:
You want to submit requests, and immediately (within reason) process the request result (Response). Well, I believe you've already seen the solution to your problem: java.util.concurrent.CompletionService.
This service which, rather simply, combines an Executor and a BlockingQueue to process Runnable and/or Callable tasks. The BlockingQueue is used to hold completed tasks, which you can have another thread wait on until a completed task is queued (this is accomplished by calling take()) on the CompletionService object.
As previous posters have mentioned, share the Executor, and create a CompletionService per request. This may seem like an expensive thing to do, but consider again that the CS is simply collaborating with the Executor and a BlockingQueue. Since you are sharing the most expensive object to instantiate, i.e., the Executor, I think you'll find that this a very reasonable cost.
However... all this being said, you still seem to have an issue, and that issue seems to be the separation of Request handling, from the handling of Responses. This might be approached by creating a separate service which exclusively handles Responses for all Requests, or for a certain type of Request.
Here is an example: (Note: it's implied that the Request object implement's the Callable interface which should return a Response type... details which I've omitted for this simple example).
class RequestHandler {
RequestHandler(ExecutorService responseExecutor, ResponseHandler responseHandler) {
this.responseQueue = ...
this.executor = ...
}
public void acceptRequest(List<Request> requestList) {
for(Request req : requestList) {
Response response = executor.submit(req);
responseHandler.handleResponse(response);
}
}
}
class ResponseHandler {
ReentrantLock lock;
ResponseHandler(ExecutorService responseExecutor) {
...
}
public void handleResponse(Response res) {
lock.lock() {
try {
responseExecutor.submit( new ResponseWorker(res) );
} finally {
lock.unlock();
}
}
private static class ResponseWorker implements Runnable {
ResponseWorker(Response response) {
response = ...
}
void processResponse() {
// process this response
}
public void run() {
processResponse();
}
}
}
A couple of things to remember: one, an ExecutorService executes Callables or Runnables from a blocking queue; your RequestHandler receives task's, and those are enqueued on the Executor, and processed ASAP. The same thing happens in your ResponseHandler; a response is received, and as soon as that SEPARATE executor can, it will process that response. In short, you've got two executors working simultaneously: one on Request objects, the other on Response objects.
You can just use a normal shared ExecutorService. Whenever you submit a task, you will get a Future back for the task you just submitted. You can store all of them in a list and query them later.
Example:
private final ExecutorService service = ...//a single, shared instance
void handleRequest(Integer[] input) {
// Submit tasks
List<Future<Integer>> futures = new ArrayList<Future<Integer>>(input.length);
for (final Integer i : input) {
Future<Integer> future = service.submit(new Callable<Integer>() {
public Integer call() {
return -1 * i;
}
});
futures.add(future);
}
// Do other stuff...
// Get task results
for(Future<Integer> f : futures){
try {
Integer result = f.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Why do you need a CompletionService
?
Each thread could simply submit to or invoke Callables
on a "regular" and shared instance of an ExecutorService
. Each thread then holds on to their own private Future
references.
Also, Executor
and its descendants are thread-safe by design. What you actually want is that each thread can create its own tasks and inspect their results.
The Javadoc in java.util.concurrent
is excellent; it includes usage patterns and examples. Read the doc for ExecutorService and other types to better understand how to use them.
精彩评论