Waiting on a list of Future

217,265

Solution 1

You can use a CompletionService to receive the futures as soon as they are ready and if one of them throws an exception cancel the processing. Something like this:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

I think you can further improve to cancel any still executing tasks if one of them throws an error.

Solution 2

If you are using Java 8 then you can do this easier with CompletableFuture and CompletableFuture.allOf, which applies the callback only after all supplied CompletableFutures are done.

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}

Solution 3

Use a CompletableFuture in Java 8

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());

Solution 4

You can use an ExecutorCompletionService. The documentation even has an example for your exact use-case:

Suppose instead that you would like to use the first non-null result of the set of tasks, ignoring any that encounter exceptions, and cancelling all other tasks when the first one is ready:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

The important thing to notice here is that ecs.take() will get the first completed task, not just the first submitted one. Thus you should get them in the order of finishing the execution (or throwing an exception).

Solution 5

If you are using Java 8 and don't want to manipulate CompletableFutures, I have written a tool to retrieve results for a List<Future<T>> using streaming. The key is that you are forbidden to map(Future::get) as it throws.

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

This needs an AggregateException that works like C#'s

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

This component acts exactly as C#'s Task.WaitAll. I am working on a variant that does the same as CompletableFuture.allOf (equivalento to Task.WhenAll)

The reason why I did this is that I am using Spring's ListenableFuture and don't want to port to CompletableFuture despite it is a more standard way

Share:
217,265

Related videos on Youtube

Sowmya Madras Rajesh
Author by

Sowmya Madras Rajesh

Updated on April 26, 2020

Comments

  • Sowmya Madras Rajesh
    Sowmya Madras Rajesh about 4 years

    I have a method which returns a List of futures

    List<Future<O>> futures = getFutures();
    

    Now I want to wait until either all futures are done processing successfully or any of the tasks whose output is returned by a future throws an exception. Even if one task throws an exception, there is no point in waiting for the other futures.

    Simple approach would be to

    wait() {
    
       For(Future f : futures) {
         try {
           f.get();
         } catch(Exception e) {
           //TODO catch specific exception
           // this future threw exception , means somone could not do its task
           return;
         }
       }
    }
    

    But the problem here is if, for example, the 4th future throws an exception, then I will wait unnecessarily for the first 3 futures to be available.

    How to solve this? Will count down latch help in any way? I'm unable to use Future isDone because the java doc says

    boolean isDone()
    Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
    
    • Alexei Kaigorodov
      Alexei Kaigorodov over 10 years
      who generates those futures? What type are they of? Interface java.util.concurrent.Future does not provide the functionality you want, the only way is to use your own Futures with callbacks.
    • Montre
      Montre over 10 years
      You could make an instance of ExecutionService for every "batch" of tasks, submit them to it, then immediately shut down the service and use awaitTermination() on it I suppose.
    • Montre
      Montre over 10 years
      You could use a CountDownLatch if you wrapped the body of all your futures in a try..finally to make sure the latch gets decremented as well.
    • assylias
      assylias over 10 years
    • Sowmya Madras Rajesh
      Sowmya Madras Rajesh over 10 years
      @AlexeiKaigorodov YES , my future are of type java.util.concurrent.I am suing future with callable.I get Futture when i submit a task to a execureservice
    • Sowmya Madras Rajesh
      Sowmya Madras Rajesh over 10 years
      @millimoose if i wrap the code which returns every single feaure inside try cath finally it will work , but i wont be able to distinguish between successful finish of future and one with exception
    • Montre
      Montre over 10 years
      @user93796 I said try..finally, not try..catch. The exception will be rethrown and Future.get() would throw. It'd just make sure the latch gets counted down. That said, the suggestions to use CompletionService are clearly superior.
    • Montre
      Montre over 10 years
      Correction: the exception will still be thrown not rethrown.
    • rogerdpack
      rogerdpack over 8 years
  • Sowmya Madras Rajesh
    Sowmya Madras Rajesh over 10 years
    :Your code has same issue which i mentioned in my post.If forth future throws exception then the code will still wait for future 1,2,3 to complete. or will completionSerice.take) will return the future which completes first?
  • Sowmya Madras Rajesh
    Sowmya Madras Rajesh over 10 years
    What about timeouts?Can i tell completion service to wait for X seconds at max?
  • dcernahoschi
    dcernahoschi over 10 years
    Should not have. It does not iterate over the futures, but as soon as one is ready it is processed/verified if not thrown exception.
  • dcernahoschi
    dcernahoschi over 10 years
    To timeout waiting for a future to appear on the queue there is a poll(seconds) method on the CompletionService.
  • VSEWHGHP
    VSEWHGHP over 7 years
    Hi @Andrejs, could you please explain what this snippet of code does. I see this suggested in multiple places but am confused as to what is actually happening. How are exceptions handled if one of the threads fail?
  • Andrejs
    Andrejs over 7 years
    @VSEWHGHP From the javadoc: If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture also does so, with a CompletionException holding this exception as its cause.
  • VSEWHGHP
    VSEWHGHP over 7 years
    Right so I was following up on that, is there any way to use this snippet but obtain the values for all the other threads which did complete successfully? Should I just iterate over the CompletableFutures list and call get ignoring the CompletableFuture<List<T>> since the sequence function takes care of ensuring all the threads are complete either with result or exception?
  • user18853
    user18853 about 7 years
    Here is the working example on github:github.com/princegoyal1987/FutureDemo
  • Jarekczek
    Jarekczek about 6 years
    This is solving a different problem. If you have Future instances, you can't apply this method. It's not easy to convert Future into CompletableFuture.
  • granadaCoder
    granadaCoder about 5 years
    Upvote for seeing the need for a equivalent AggregateException.
  • slisnychyi
    slisnychyi over 4 years
    it will not work if we have exception in some task.
  • realPK
    realPK over 4 years
    @user18853 Your code doesn't address the asked question and looks like copied from javarevisited.blogspot.com/2015/01/…. Can you please polish the code to avoid an empty click?
  • XDS
    XDS over 4 years
    An example of using this facility would be nice.
  • Cherry
    Cherry about 4 years
    The biggest question here Why do they not put all this logic into CompletableFuture.allOf and make it return T instead of void? Why I have to copy&paste this into dozenb projects? Scala have Future.sequence why not java? :'(
  • maaw
    maaw about 4 years
    This should be the accepted answer. Also it's part of the official Spring documentation: spring.io/guides/gs/async-method
  • Joffrey
    Joffrey over 3 years
    @Andrejs what is the point of CompletableFuture.allOf here? Couldn't we just use CompletableFuture.supplyAsync to wrap the join stream? Is it just a matter of not using an extra thread?
  • Andrejs
    Andrejs over 3 years
    @Joffrey That would work though an extra thread will be blocked until all the futures complete. Might take a while.
  • Jason
    Jason over 3 years
    how could you cancel any future that takes longer than 2 seconds? using .get would "stack up" like this: stackoverflow.com/questions/17434311/…
  • lalatnayak
    lalatnayak about 2 years
    If you are using a global thread pool then this will not work. I dunno if it's a good idea to use local (non singleton) thread pools.