I've got some code that submits a request to another thread which may or may not submit that request to yet another thread. That yields a return type of Future<Future<T>>
. Is there some non-heinous way to immediately turn this into Future<T>
that waits on the completion of the entire future chain?
I'm alrea开发者_如何转开发dy using the Guava library to handle other fun concurrency stuff and as a replacement for Google Collections and its working well but I can't seem to find something for this case.
Another possible implementation that uses the guava libraries and is a lot simpler.
import java.util.concurrent.*;
import com.google.common.util.concurrent.*;
import com.google.common.base.*;
public class FFutures {
public <T> Future<T> flatten(Future<Future<T>> future) {
return Futures.chain(Futures.makeListenable(future), new Function<Future<T>, ListenableFuture<T>>() {
public ListenableFuture<T> apply(Future<T> f) {
return Futures.makeListenable(f);
}
});
}
}
Guava 13.0 adds Futures.dereference
to do this. It requires a ListenableFuture<ListenableFuture>
, rather than a plain Future<Future>
. (Operating on a plain Future
would require a makeListenable call, each of which requires a dedicated thread for the lifetime of the task (as is made clearer by the method's new name, JdkFutureAdapters.listenInPoolThread
).)
I think this is the best that can be done to implement the contract of Future. I took the tack of being as unclever as possible so as to be sure that it meets the contract. Not especially the implementation of get with timeout.
import java.util.concurrent.*;
public class Futures {
public <T> Future<T> flatten(Future<Future<T>> future) {
return new FlattenedFuture<T>(future);
}
private static class FlattenedFuture<T> implements Future<T> {
private final Future<Future<T>> future;
public FlattenedFuture(Future<Future<T>> future) {
this.future = future;
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!future.isDone()) {
return future.cancel(mayInterruptIfRunning);
} else {
while (true) {
try {
return future.get().cancel(mayInterruptIfRunning);
} catch (CancellationException ce) {
return true;
} catch (ExecutionException ee) {
return false;
} catch (InterruptedException ie) {
// pass
}
}
}
}
public T get() throws InterruptedException,
CancellationException,
ExecutionException
{
return future.get().get();
}
public T get(long timeout, TimeUnit unit) throws InterruptedException,
CancellationException,
ExecutionException,
TimeoutException
{
if (future.isDone()) {
return future.get().get(timeout, unit);
} else {
return future.get(timeout, unit).get(0, TimeUnit.SECONDS);
}
}
public boolean isCancelled() {
while (true) {
try {
return future.isCancelled() || future.get().isCancelled();
} catch (CancellationException ce) {
return true;
} catch (ExecutionException ee) {
return false;
} catch (InterruptedException ie) {
// pass
}
}
}
public boolean isDone() {
return future.isDone() && innerIsDone();
}
private boolean innerIsDone() {
while (true) {
try {
return future.get().isDone();
} catch (CancellationException ce) {
return true;
} catch (ExecutionException ee) {
return true;
} catch (InterruptedException ie) {
// pass
}
}
}
}
}
You could create a class like:
public class UnwrapFuture<T> implements Future<T> {
Future<Future<T>> wrappedFuture;
public UnwrapFuture(Future<Future<T>> wrappedFuture) {
this.wrappedFuture = wrappedFuture;
}
public boolean cancel(boolean mayInterruptIfRunning) {
try {
return wrappedFuture.get().cancel(mayInterruptIfRunning);
} catch (InterruptedException e) {
//todo: do something
} catch (ExecutionException e) {
//todo: do something
}
}
...
}
You'll have to deal with exceptions that get() can raise but other methods cannot.
This was my first stab at it but I'm sure there is plenty wrong with it. I'd be more than happy to just replace it with something like Futures.compress(f)
.
public class CompressedFuture<T> implements Future<T> {
private final Future<Future<T>> delegate;
public CompressedFuture(Future<Future<T>> delegate) {
this.delegate = delegate;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isDone()) {
return delegate.cancel(mayInterruptIfRunning);
}
try {
return delegate.get().cancel(mayInterruptIfRunning);
} catch (InterruptedException e) {
throw new RuntimeException("Error fetching a finished future", e);
} catch (ExecutionException e) {
throw new RuntimeException("Error fetching a finished future", e);
}
}
@Override
public T get() throws InterruptedException, ExecutionException {
return delegate.get().get();
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long endTime = System.currentTimeMillis() + unit.toMillis(timeout);
Future<T> next = delegate.get(timeout, unit);
return next.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public boolean isCancelled() {
if (!delegate.isDone()) {
return delegate.isCancelled();
}
try {
return delegate.get().isCancelled();
} catch (InterruptedException e) {
throw new RuntimeException("Error fetching a finished future", e);
} catch (ExecutionException e) {
throw new RuntimeException("Error fetching a finished future", e);
}
}
@Override
public boolean isDone() {
if (!delegate.isDone()) {
return false;
}
try {
return delegate.get().isDone();
} catch (InterruptedException e) {
throw new RuntimeException("Error fetching a finished future", e);
} catch (ExecutionException e) {
throw new RuntimeException("Error fetching a finished future", e);
}
}
}
精彩评论