Convert from List<CompletableFuture> to CompletableFuture<List>

43,316

Solution 1

Use CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

A few comments on your implementation:

Your use of .thenComposeAsync, .thenApplyAsync and .thenCombineAsync is likely not doing what you expect. These ...Async methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync methods without a good reason.

Additionally, reduce should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect instead.

If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence method:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow(); right after result.completeExceptionally(ex);. This, of course, assumes that exec only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future individually.

Solution 2

You can get Spotify's CompletableFutures library and use allAsList method. I think it's inspired from Guava's Futures.allAsList method.

public static <T> CompletableFuture<List<T>> allAsList(
    List<? extends CompletionStage<? extends T>> stages) {

And here is a simple implementation if you don't want to use a library:

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
        futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}

Solution 3

As Misha has pointed out, you are overusing …Async operations. Further, you are composing a complex chain of operations modelling a dependency which doesn’t reflect your program logic:

  • you create a job x which depends on the first and second job of your list
  • you create a job x+1 which depends on job x and the third job of your list
  • you create a job x+2 which depends on job x+1 and the 4th job of your list
  • you create a job x+5000 which depends on job x+4999 and the last job of your list

Then, canceling (explicitly or due to an exception) this recursively composed job might be performed recursively and might fail with a StackOverflowError. That’s implementation-dependent.

As already shown by Misha, there is a method, allOf which allows you to model your original intention, to define one job which depends on all jobs of your list.

However, it’s worth noting that even that isn’t necessary. Since you are using an unbounded thread pool executor, you can simply post an asynchronous job collecting the results into a list and you are done. Waiting for the completion is implied by asking for the result of each job anyway.

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
  .mapToObj(x -> CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
    return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
    () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
    executorService);

Using methods for composing dependent operations are important, when the number of threads is limited and the jobs may spawn additional asynchronous jobs, to avoid having waiting jobs stealing threads from jobs which have to complete first, but neither is the case here.

In this specific case one job simply iterating over this large number of prerequisite jobs and waiting if necessary may be more efficient than modelling this large number of dependencies and having each job to notify the dependent job about the completion.

Solution 4

To add upto the accepted answer by @Misha, it can be further expanded as a collector:

 public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
    return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}

Now you can:

Stream<CompletableFuture<Integer>> stream = Stream.of(
    CompletableFuture.completedFuture(1),
    CompletableFuture.completedFuture(2),
    CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());

Solution 5

An example sequence operation using thenCombine on CompletableFuture

public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){

    CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());

    BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = 
            (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});

    BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;  

    return com.stream()
              .reduce(identity,
                      combineToList,
                      combineLists);  

   }
} 

If you don't mind using 3rd party libraries cyclops-react (I am the author) has a set of utility methods for CompletableFutures (and Optionals, Streams etc)

  List<CompletableFuture<String>> listOfFutures;

  CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);
Share:
43,316

Related videos on Youtube

Jatin
Author by

Jatin

http://jatinpuri.com

Updated on December 29, 2021

Comments

  • Jatin
    Jatin over 2 years

    I am trying to convert List<CompletableFuture<X>> to CompletableFuture<List<T>>. This is quite useful as when you have many asynchronous tasks and you need to get results of all of them.

    If any of them fails then the final future fails. This is how I have implemented:

    public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
        if(com.isEmpty()){
            throw new IllegalArgumentException();
        }
        Stream<? extends CompletableFuture<T>> stream = com.stream();
        CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
        return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
            x.add(y);
            return x;
        },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
            ls1.addAll(ls2);
            return ls1;
        },exec));
    }
    

    To run it:

    ExecutorService executorService = Executors.newCachedThreadPool();
    Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep((long) (Math.random() * 10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x;
    }, executorService));
    CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
    

    If any of them fails then it fails. It gives output as expected even if there are a million futures. The problem I have is: Say if there are more than 5000 futures and if any of them fails, I get a StackOverflowError:

    Exception in thread "pool-1-thread-2611" java.lang.StackOverflowError at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)

    What am I doing it wrong?

    Note: The above returned future fails right when any of the future fails. The accepted answer should also take this point.

    • fge
      fge about 9 years
      If I were you I'd implement a Collector instead...
    • Jatin
      Jatin about 9 years
      @fge That is actually a very good suggestion. I am coming from scala world where we have a similar thing. Collector might be a better fit here. But then the implementation i suppose might be similar.
  • Jatin
    Jatin about 9 years
    One thing that I dont understand is, allof return type is CompletableFuture<Void> and we return CompletableFuture<List<T>> without any compiler warning. I was not aware of this nature of void
  • Jatin
    Jatin about 9 years
    In my case it is perfectly fine to use ArrayList because the addition operation cannot be performed in parallel. Only when the previous one is completed we add an element, and so on. So the addition will never be performed in parallel.
  • Misha
    Misha about 9 years
    See the .thenApply part. After allOf(...) completes successfully, it collects all the resulting values into a list.
  • Jatin
    Jatin about 9 years
    I agree with most of the answer except the ArrayList part. it is completely safe to use it in my code. Else this is a better code than mine (just that you iterate over the list multiple times. which is ok given the verbosity lost).
  • Misha
    Misha about 9 years
    @Jatin I think you might be right about that. I will rethink it in the morning when I'm more awake and modify my answer accordingly.
  • Misha
    Misha about 9 years
    One caveat is that using supplyAsync instead of allOf will consume a thread from the pool to await the completion of all tasks. If I'm not mistaken, allOf will operate within the threads assigned to respective tasks. Not a big deal for most use cases, but worth noting.
  • Misha
    Misha about 9 years
    @Jatin You are right, within the current implementation of reduce, so long as the stream in the sequence2 method is kept sequential, ArrayList is safe. However, it is very undesirable to write stream constructs that break if stream were made parallel. In the very least, if you rely on the stream being sequential, the 3rd argument to reduce should be (a, b) -> {throw new IllegalStateException("Parallel not allowed");}
  • Jatin
    Jatin about 9 years
    A good point! Thanks. I think sequence2 was a bad idea. A collector is certainly better and so is your answer.
  • Holger
    Holger about 9 years
    @Misha: I did mention that it will steal a thread if the number of threads is limited and that it works here because an unlimited thread pool executor is used (and no async sub-jobs are spawned).
  • Jatin
    Jatin almost 9 years
    There is one flaw here. CompletableFuture.allOf only completes when all complete. Even if any amongst them fails, it still waits for others for response.
  • Jatin
    Jatin almost 9 years
    @Holger A problem with this answer is that: If any of the later future fails, it still waits for one it is joined upon to complete. Rather, as soon as something gets failed, the returned future should be failed right then.
  • Jatin
    Jatin almost 9 years
    Actually, I am even alright with this fact. but not thread stealing.
  • Misha
    Misha almost 9 years
    That is exactly how your original solution (using thenCombine) would behave. If you want to short-circuit the computation and trigger exceptional completion immediately, it's easy to do. See updated answer.
  • Parv Bhasker
    Parv Bhasker over 8 years
    For completion :-P i think this code needs a import static java.util.stream.Collectors.toList
  • Dirk Hillbrecht
    Dirk Hillbrecht over 8 years
    This does not compile: Type mismatch: cannot convert from CompletableFuture<Object> to CompletableFuture<List<T>>. JDK 8u66
  • Abhijit Sarkar
    Abhijit Sarkar over 8 years
    @Misha I don't understand the benefit of using allOf in your code. The individual tasks are invoked sequentially using join, so task i + 1 is not invoked until task i is complete. I didn't find anything in the documentation that allows for invoking all the subtasks parallely. The closest thing seems to be ForkJoinPool .invokeAll that takes a bunch of Callable.
  • Misha
    Misha over 8 years
    @AbhijitSarkar The tasks aren't invoked by join. The benefit of using allOf is that when allOf triggers, all the tasks have been completed and join just gets the results.
  • Misha
    Misha over 8 years
    @DirkHillbrecht are you using eclipse?
  • Abhijit Sarkar
    Abhijit Sarkar over 8 years
    @Misha Could you elaborate on when are the individual tasks invoked and how is the max concurrency controlled? If I submit hundreds of tasks using allOf, there's no way all of them could be invoked concurrently as the system just wouldn't have enough resources for that, besides completely defeating the purpose of thread pooling.
  • Misha
    Misha over 8 years
    @AbhijitSarkar That is out of the scope of this method. The assignment to thread pools is done when individual CompletableFutures were created. It's entirely possible that all the tasks were scheduled in a single thread and the whole thing will run sequentially. It's also possible that they were scheduled in a giant thread pool and everything will run concurrently. Either way, allOf will ensure that all tasks have completed before it triggers.
  • Dirk Hillbrecht
    Dirk Hillbrecht over 8 years
    @Misha, Yes, Eclipse Mars.1 using JDK 8u66.
  • Jatin
    Jatin over 7 years
    I like the answer. But it relies on javaslang.concurrent.Future :(
  • Mathias Dpunkt
    Mathias Dpunkt over 7 years
    That is true - but having worked with javaslang Future you do not want to go back to the java Future or CompletableFuture
  • Valery Silaev
    Valery Silaev almost 7 years
    Update: suggested code moved to separate library, github.com/vsilaev/tascalate-concurrent
  • cubuspl42
    cubuspl42 over 6 years
    @Misha If we provide this function with a list of completable futures backed by a thread (created with supplyAsync), and one of the futures fail, then what happens to the others? Are they abandoned as "zombie threads"?
  • charlie
    charlie about 6 years
    Wouldn't a direct CompletableFuture.supplyAsync(() -> com.parallelStream().map(CompletableFuture::join).collect(to‌​List), exec) be just enough?
  • Didier L
    Didier L almost 6 years
    You should use thenCombine() instead of thenApply() in the accumulator, to avoid the join() call. Otherwise the calling thread will actually execute that, so the collection will only return after everything has completed. You can check this by adding a print before the futureList.join(): it only gets printed after all futures have printed “Succesfully loaded test data”.
  • Kai Stapel
    Kai Stapel almost 6 years
    @DidierL If I change thenApply() to thenCombine() then the final join() call to the CompletableFuture<List<V>> will not block anymore but return immediately with an empty result. So the future of list will not wait until all individual futures are complete. But that was the initial idea of the whole thing.
  • Didier L
    Didier L almost 6 years
    Yes, indeed, I forgot that a Collector relies on mutation. The problem with your code is that it is equivalent to CompletableFuture.completedFuture(listOfFutures.stream().map‌​(CompletableFuture::‌​join).collect(toList‌​()));. The collection is actually returning a future that is already completed, so there is no point in returning a future any more.
  • Kai Stapel
    Kai Stapel almost 6 years
    You may be correct that this is functionally equivalent to my "complete example". However, the example is just for illustrating purposes on how to use the toFutureList() collector. What is not equivalent is listOfFutures.stream().map(CompletableFuture::join).collect(‌​toList()) and listOfFutures.stream().collect(toFutureList()). The former gives you a complete result with all futures completed, while the latter gives you a future of a list of values that you can pass on, or map to other values without blocking.
  • Didier L
    Didier L almost 6 years
    That's where you are wrong: the latter does exactly the same. Your collector simply calls join() on all futures on the calling thread, and wraps the result in an already completed CompletableFuture. It is blocking. As I said previously, just add a print right after the stream collection and you will see that this print will only occur after all futures are completed.
  • Kai Stapel
    Kai Stapel almost 6 years
    Yes, you are right. I updated my answer accordingly. Thanks for pointing that out.
  • Simon Forsberg
    Simon Forsberg almost 5 years
    This answer is useless if the link stops working. Please embed the code in the answer.
  • NIGAGA
    NIGAGA over 4 years
    I know its a bit old thread. I have tried to execute the mentioned solution in my windows eclipse environment. However, every time I execute, it exit on the first occurence of exception. Is it because of result.completeExceptionally(ex) get executed and kills all the remaining threads.If yes, what can i do to ensure remaining of the threads get executed as well.
  • Misha
    Misha over 4 years
    @NIGAGA The version of the code with result.completeExceptionally is specifically meant to complete after the first failure (see the description before the code listing). The first version with just allOf will wait for all tasks to complete (successfully or not) before completing the future. If this doesn't help you, please post a new question and include your code.
  • NIGAGA
    NIGAGA over 4 years
    @Misha Thanks I understand now.
  • Holger
    Holger over 4 years
    Recommended: Arrays of Wisdom of the Ancients. In short, use toArray(new CompletableFuture<?>[0]). It’s simpler and less error prone and more efficient on the most widespread JVM implementation…