开发者

Is there an easy way to turn Future<Future<T>> into Future<T>?

开发者 https://www.devze.com 2022-12-18 07:42 出处:网络
I\'ve got some code that submits a request to another thread which may or may not submit that request to yet another thread.That yields a return type of Future<Future<T>>.Is there some non

I've got some code that submits a request to another thread which may or may not submit that request to yet another thread. That yields a return type of Future<Future<T>>. Is there some non-heinous way to immediately turn this into Future<T> that waits on the completion of the entire future chain?

I'm alrea开发者_如何转开发dy using the Guava library to handle other fun concurrency stuff and as a replacement for Google Collections and its working well but I can't seem to find something for this case.


Another possible implementation that uses the guava libraries and is a lot simpler.

import java.util.concurrent.*;
import com.google.common.util.concurrent.*;
import com.google.common.base.*;

public class FFutures {
  public <T> Future<T> flatten(Future<Future<T>> future) {
    return Futures.chain(Futures.makeListenable(future), new Function<Future<T>, ListenableFuture<T>>() {
      public ListenableFuture<T> apply(Future<T> f) {
        return Futures.makeListenable(f);
      }
    });
  }
}


Guava 13.0 adds Futures.dereference to do this. It requires a ListenableFuture<ListenableFuture>, rather than a plain Future<Future>. (Operating on a plain Future would require a makeListenable call, each of which requires a dedicated thread for the lifetime of the task (as is made clearer by the method's new name, JdkFutureAdapters.listenInPoolThread).)


I think this is the best that can be done to implement the contract of Future. I took the tack of being as unclever as possible so as to be sure that it meets the contract. Not especially the implementation of get with timeout.

import java.util.concurrent.*;

public class Futures {
  public <T> Future<T> flatten(Future<Future<T>> future) {
    return new FlattenedFuture<T>(future);
  }

  private static class FlattenedFuture<T> implements Future<T> {
    private final Future<Future<T>> future;

    public FlattenedFuture(Future<Future<T>> future) {
      this.future = future;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
      if (!future.isDone()) {
        return future.cancel(mayInterruptIfRunning);
      } else {
        while (true) {
          try {
            return future.get().cancel(mayInterruptIfRunning);
          } catch (CancellationException ce) {
            return true;
          } catch (ExecutionException ee) {
            return false;
          } catch (InterruptedException ie) {
            // pass
          }
        }
      }
    }

    public T get() throws InterruptedException, 
                          CancellationException, 
                          ExecutionException 
    {
      return future.get().get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, 
                                                     CancellationException, 
                                                     ExecutionException, 
                                                     TimeoutException 
    {
      if (future.isDone()) {
        return future.get().get(timeout, unit);
      } else {
        return future.get(timeout, unit).get(0, TimeUnit.SECONDS);
      }
    }

    public boolean isCancelled() {
      while (true) {
        try {
          return future.isCancelled() || future.get().isCancelled();
        } catch (CancellationException ce) {
          return true;
        } catch (ExecutionException ee) {
          return false;
        } catch (InterruptedException ie) {
          // pass
        }
      }
    }

    public boolean isDone() {
      return future.isDone() && innerIsDone();
    }

    private boolean innerIsDone() {
      while (true) {
        try {
          return future.get().isDone();
        } catch (CancellationException ce) {
          return true;
        } catch (ExecutionException ee) {
          return true;
        } catch (InterruptedException ie) {
          // pass
        }
      }
    }
  }
}


You could create a class like:

public class UnwrapFuture<T> implements Future<T> {
    Future<Future<T>> wrappedFuture;

    public UnwrapFuture(Future<Future<T>> wrappedFuture) {
        this.wrappedFuture = wrappedFuture;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        try {
            return wrappedFuture.get().cancel(mayInterruptIfRunning);
        } catch (InterruptedException e) {
            //todo: do something
        } catch (ExecutionException e) {
            //todo: do something
        }
    }
    ...
}

You'll have to deal with exceptions that get() can raise but other methods cannot.


This was my first stab at it but I'm sure there is plenty wrong with it. I'd be more than happy to just replace it with something like Futures.compress(f).

public class CompressedFuture<T> implements Future<T> {
    private final Future<Future<T>> delegate;

    public CompressedFuture(Future<Future<T>> delegate) {
        this.delegate = delegate;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (delegate.isDone()) {
            return delegate.cancel(mayInterruptIfRunning);
        }
        try {
            return delegate.get().cancel(mayInterruptIfRunning);
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return delegate.get().get();
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        long endTime = System.currentTimeMillis() + unit.toMillis(timeout);
        Future<T> next = delegate.get(timeout, unit);
        return next.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public boolean isCancelled() {
        if (!delegate.isDone()) {
            return delegate.isCancelled();
        }
        try {
            return delegate.get().isCancelled();
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }

    @Override
    public boolean isDone() {
        if (!delegate.isDone()) {
            return false;
        }
        try {
            return delegate.get().isDone();
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }
}
0

精彩评论

暂无评论...
验证码 换一张
取 消