Convert from List<CompletableFuture> to CompletableFuture<List>
Solution 1
Use CompletableFuture.allOf(...)
:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
A few comments on your implementation:
Your use of .thenComposeAsync
, .thenApplyAsync
and .thenCombineAsync
is likely not doing what you expect. These ...Async
methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync
methods without a good reason.
Additionally, reduce
should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect
instead.
If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence
method:
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));
return result;
If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow();
right after result.completeExceptionally(ex);
. This, of course, assumes that exec
only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future
individually.
Solution 2
You can get Spotify's CompletableFutures
library and use allAsList
method. I think it's inspired from Guava's Futures.allAsList
method.
public static <T> CompletableFuture<List<T>> allAsList(
List<? extends CompletionStage<? extends T>> stages) {
And here is a simple implementation if you don't want to use a library:
public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()])
).thenApply(ignored ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
}
Solution 3
As Misha has pointed out, you are overusing …Async
operations. Further, you are composing a complex chain of operations modelling a dependency which doesn’t reflect your program logic:
- you create a job x which depends on the first and second job of your list
- you create a job x+1 which depends on job x and the third job of your list
- you create a job x+2 which depends on job x+1 and the 4th job of your list
- …
- you create a job x+5000 which depends on job x+4999 and the last job of your list
Then, canceling (explicitly or due to an exception) this recursively composed job might be performed recursively and might fail with a StackOverflowError
. That’s implementation-dependent.
As already shown by Misha, there is a method, allOf
which allows you to model your original intention, to define one job which depends on all jobs of your list.
However, it’s worth noting that even that isn’t necessary. Since you are using an unbounded thread pool executor, you can simply post an asynchronous job collecting the results into a list and you are done. Waiting for the completion is implied by asking for the result of each job anyway.
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
.mapToObj(x -> CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
() -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
executorService);
Using methods for composing dependent operations are important, when the number of threads is limited and the jobs may spawn additional asynchronous jobs, to avoid having waiting jobs stealing threads from jobs which have to complete first, but neither is the case here.
In this specific case one job simply iterating over this large number of prerequisite jobs and waiting if necessary may be more efficient than modelling this large number of dependencies and having each job to notify the dependent job about the completion.
Solution 4
To add upto the accepted answer by @Misha, it can be further expanded as a collector:
public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}
Now you can:
Stream<CompletableFuture<Integer>> stream = Stream.of(
CompletableFuture.completedFuture(1),
CompletableFuture.completedFuture(2),
CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());
Solution 5
An example sequence operation using thenCombine on CompletableFuture
public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){
CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());
BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList =
(acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});
BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;
return com.stream()
.reduce(identity,
combineToList,
combineLists);
}
}
If you don't mind using 3rd party libraries cyclops-react (I am the author) has a set of utility methods for CompletableFutures (and Optionals, Streams etc)
List<CompletableFuture<String>> listOfFutures;
CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);
Related videos on Youtube
Comments
-
Jatin over 2 years
I am trying to convert
List<CompletableFuture<X>>
toCompletableFuture<List<T>>
. This is quite useful as when you have many asynchronous tasks and you need to get results of all of them.If any of them fails then the final future fails. This is how I have implemented:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) { if(com.isEmpty()){ throw new IllegalArgumentException(); } Stream<? extends CompletableFuture<T>> stream = com.stream(); CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>()); return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> { x.add(y); return x; },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> { ls1.addAll(ls2); return ls1; },exec)); }
To run it:
ExecutorService executorService = Executors.newCachedThreadPool(); Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> { try { Thread.sleep((long) (Math.random() * 10)); } catch (InterruptedException e) { e.printStackTrace(); } return x; }, executorService)); CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
If any of them fails then it fails. It gives output as expected even if there are a million futures. The problem I have is: Say if there are more than 5000 futures and if any of them fails, I get a
StackOverflowError
:Exception in thread "pool-1-thread-2611" java.lang.StackOverflowError at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)
What am I doing it wrong?
Note: The above returned future fails right when any of the future fails. The accepted answer should also take this point.
-
fge about 9 yearsIf I were you I'd implement a
Collector
instead... -
Jatin about 9 years@fge That is actually a very good suggestion. I am coming from scala world where we have a similar thing. Collector might be a better fit here. But then the implementation i suppose might be similar.
-
-
Jatin about 9 yearsOne thing that I dont understand is,
allof
return type isCompletableFuture<Void>
and we returnCompletableFuture<List<T>>
without any compiler warning. I was not aware of this nature of void -
Jatin about 9 yearsIn my case it is perfectly fine to use ArrayList because the addition operation cannot be performed in parallel. Only when the previous one is completed we add an element, and so on. So the addition will never be performed in parallel.
-
Misha about 9 yearsSee the
.thenApply
part. AfterallOf(...)
completes successfully, it collects all the resulting values into a list. -
Jatin about 9 yearsI agree with most of the answer except the
ArrayList
part. it is completely safe to use it in my code. Else this is a better code than mine (just that you iterate over the list multiple times. which is ok given the verbosity lost). -
Misha about 9 years@Jatin I think you might be right about that. I will rethink it in the morning when I'm more awake and modify my answer accordingly.
-
Misha about 9 yearsOne caveat is that using
supplyAsync
instead ofallOf
will consume a thread from the pool to await the completion of all tasks. If I'm not mistaken,allOf
will operate within the threads assigned to respective tasks. Not a big deal for most use cases, but worth noting. -
Misha about 9 years@Jatin You are right, within the current implementation of
reduce
, so long as the stream in thesequence2
method is kept sequential, ArrayList is safe. However, it is very undesirable to write stream constructs that break if stream were made parallel. In the very least, if you rely on the stream being sequential, the 3rd argument toreduce
should be(a, b) -> {throw new IllegalStateException("Parallel not allowed");}
-
Jatin about 9 yearsA good point! Thanks. I think sequence2 was a bad idea. A collector is certainly better and so is your answer.
-
Holger about 9 years@Misha: I did mention that it will steal a thread if the number of threads is limited and that it works here because an unlimited thread pool executor is used (and no async sub-jobs are spawned).
-
Jatin almost 9 yearsThere is one flaw here.
CompletableFuture.allOf
only completes when all complete. Even if any amongst them fails, it still waits for others for response. -
Jatin almost 9 years@Holger A problem with this answer is that: If any of the later future fails, it still waits for one it is joined upon to complete. Rather, as soon as something gets failed, the returned future should be failed right then.
-
Jatin almost 9 yearsActually, I am even alright with this fact. but not thread stealing.
-
Misha almost 9 yearsThat is exactly how your original solution (using
thenCombine
) would behave. If you want to short-circuit the computation and trigger exceptional completion immediately, it's easy to do. See updated answer. -
Parv Bhasker over 8 yearsFor completion :-P i think this code needs a import static java.util.stream.Collectors.toList
-
Dirk Hillbrecht over 8 yearsThis does not compile:
Type mismatch: cannot convert from CompletableFuture<Object> to CompletableFuture<List<T>>
. JDK 8u66 -
Abhijit Sarkar over 8 years@Misha I don't understand the benefit of using
allOf
in your code. The individual tasks are invoked sequentially usingjoin
, so taski + 1
is not invoked until taski
is complete. I didn't find anything in the documentation that allows for invoking all the subtasks parallely. The closest thing seems to beForkJoinPool .invokeAll
that takes a bunch ofCallable
. -
Misha over 8 years@AbhijitSarkar The tasks aren't invoked by
join
. The benefit of usingallOf
is that whenallOf
triggers, all the tasks have been completed andjoin
just gets the results. -
Misha over 8 years@DirkHillbrecht are you using eclipse?
-
Abhijit Sarkar over 8 years@Misha Could you elaborate on when are the individual tasks invoked and how is the max concurrency controlled? If I submit hundreds of tasks using
allOf
, there's no way all of them could be invoked concurrently as the system just wouldn't have enough resources for that, besides completely defeating the purpose of thread pooling. -
Misha over 8 years@AbhijitSarkar That is out of the scope of this method. The assignment to thread pools is done when individual
CompletableFuture
s were created. It's entirely possible that all the tasks were scheduled in a single thread and the whole thing will run sequentially. It's also possible that they were scheduled in a giant thread pool and everything will run concurrently. Either way,allOf
will ensure that all tasks have completed before it triggers. -
Dirk Hillbrecht over 8 years@Misha, Yes, Eclipse Mars.1 using JDK 8u66.
-
Jatin over 7 yearsI like the answer. But it relies on
javaslang.concurrent.Future
:( -
Mathias Dpunkt over 7 yearsThat is true - but having worked with javaslang Future you do not want to go back to the java Future or CompletableFuture
-
Valery Silaev almost 7 yearsUpdate: suggested code moved to separate library, github.com/vsilaev/tascalate-concurrent
-
cubuspl42 over 6 years@Misha If we provide this function with a list of completable futures backed by a thread (created with
supplyAsync
), and one of the futures fail, then what happens to the others? Are they abandoned as "zombie threads"? -
charlie about 6 yearsWouldn't a direct
CompletableFuture.supplyAsync(() -> com.parallelStream().map(CompletableFuture::join).collect(toList), exec)
be just enough? -
Didier L almost 6 yearsYou should use
thenCombine()
instead ofthenApply()
in the accumulator, to avoid thejoin()
call. Otherwise the calling thread will actually execute that, so the collection will only return after everything has completed. You can check this by adding a print before thefutureList.join()
: it only gets printed after all futures have printed “Succesfully loaded test data”. -
Kai Stapel almost 6 years@DidierL If I change
thenApply()
tothenCombine()
then the finaljoin()
call to theCompletableFuture<List<V>>
will not block anymore but return immediately with an empty result. So the future of list will not wait until all individual futures are complete. But that was the initial idea of the whole thing. -
Didier L almost 6 yearsYes, indeed, I forgot that a
Collector
relies on mutation. The problem with your code is that it is equivalent toCompletableFuture.completedFuture(listOfFutures.stream().map(CompletableFuture::join).collect(toList()));
. The collection is actually returning a future that is already completed, so there is no point in returning a future any more. -
Kai Stapel almost 6 yearsYou may be correct that this is functionally equivalent to my "complete example". However, the example is just for illustrating purposes on how to use the
toFutureList()
collector. What is not equivalent islistOfFutures.stream().map(CompletableFuture::join).collect(toList())
andlistOfFutures.stream().collect(toFutureList())
. The former gives you a complete result with all futures completed, while the latter gives you a future of a list of values that you can pass on, or map to other values without blocking. -
Didier L almost 6 yearsThat's where you are wrong: the latter does exactly the same. Your collector simply calls
join()
on all futures on the calling thread, and wraps the result in an already completedCompletableFuture
. It is blocking. As I said previously, just add a print right after the stream collection and you will see that this print will only occur after all futures are completed. -
Kai Stapel almost 6 yearsYes, you are right. I updated my answer accordingly. Thanks for pointing that out.
-
Simon Forsberg almost 5 yearsThis answer is useless if the link stops working. Please embed the code in the answer.
-
NIGAGA over 4 yearsI know its a bit old thread. I have tried to execute the mentioned solution in my windows eclipse environment. However, every time I execute, it exit on the first occurence of exception. Is it because of
result.completeExceptionally(ex)
get executed and kills all the remaining threads.If yes, what can i do to ensure remaining of the threads get executed as well. -
Misha over 4 years@NIGAGA The version of the code with
result.completeExceptionally
is specifically meant to complete after the first failure (see the description before the code listing). The first version with justallOf
will wait for all tasks to complete (successfully or not) before completing the future. If this doesn't help you, please post a new question and include your code. -
NIGAGA over 4 years@Misha Thanks I understand now.
-
Holger over 4 yearsRecommended: Arrays of Wisdom of the Ancients. In short, use
toArray(new CompletableFuture<?>[0])
. It’s simpler and less error prone and more efficient on the most widespread JVM implementation…