How to combine 3 or more CompletionStages?

36,820

Solution 1

The only way to combine multiple stages that scales well with a growing number of stages, is to use CompletableFuture. If your CompletionStages aren’t CompletableFutures you may still convert them using .toCompletableFuture():

CompletableFuture<A> aCompletionStage = getA().toCompletableFuture();
CompletableFuture<B> bCompletionStage = getB().toCompletableFuture();
CompletableFuture<C> cCompletionStage = getC().toCompletableFuture();
CompletableFuture<D> dCompletionStage = getD().toCompletableFuture();

CompletionStage<Combined> combinedDataCompletionStage = CompletableFuture.allOf(
    aCompletionStage, bCompletionStage, cCompletionStage, dCompletionStage)
    .thenApply(ignoredVoid -> combine(
        aCompletionStage.join(), bCompletionStage.join(),
        cCompletionStage.join(), dCompletionStage.join()) );

This contains more boilerplate than combining two stages via thenCombine but the boilerplate doesn’t grow when adding more stages to it.


Note that even with your original thenCombine approach, you don’t need a Triple, a Pair is sufficient:

CompletionStage<Combined> combinedDataCompletionStage =
    aCompletionStage.thenCombine(bCompletionStage, (Pair::of)).thenCombine(
        cCompletionStage.thenCombine(dCompletionStage, Pair::of),
        (ab, cd) -> combine(ab.getLeft(), ab.getRight(), cd.getLeft(), cd.getRight()));

Still, it doesn’t scale well if you want to combine more stages.


An in-between solution (regarding complexity) might be:

CompletionStage<Combined> combinedDataCompletionStage = aCompletionStage.thenCompose(
    a -> bCompletionStage.thenCompose(b -> cCompletionStage.thenCompose(
        c -> dCompletionStage.thenApply(d -> combine(a, b, c, d)))));

That’s simpler in its structure but still doesn’t scale well with more more stages.

Solution 2

Holger's third answer can be made a little bit shorter:

CompletionStage<Combined> combinedDataCompletionStage = aCompletionStage.thenCompose(
    a -> bCompletionStage.thenCompose(
        b -> cCompletionStage.thenCombine(dCompletionStage,
            (c, d) -> combine(a, b, c, d))));

Solution 3

You asked about "3 or more", if you have them in a List as CompletableFutures (see other answers) you could use this handy method:

private static <T> CompletableFuture<List<T>> join(List<CompletableFuture<T>> executionPromises) {
    CompletableFuture<Void> joinedPromise = CompletableFuture.allOf(executionPromises.toArray(CompletableFuture[]::new));
    return joinedPromise.thenApply(voit -> executionPromises.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}

It converts your "list of futures" to a "future for a list of the results".

Solution 4

Any number of CompletableFuture can be combined (reduced)

CompletionStage<A> futA = getA();
CompletionStage<B> futB = getB(); 
CompletionStage<C> futC = getC(); 

Stream.of(futA, futB, futC) 
    .reduce((f1, f2) -> f1.thenCombine(f2, (d1, d2) -> combine(d1, d2));

The implementation of combine method will be responsible to merge data values (A, B and C), which could be tricky if A, B and C are disparate.

Solution 5

I think you should use an intermediary object, but one of your own instead of using Pair and Tuple

public R method() {
    CompletableFuture<A> aFuture = getAFuture();
    CompletableFuture<B> bFuture = getBFuture();
    CompletableFuture<C> cFuture = getCFuture();
    CompletableFuture<D> dFuture = getDFuture();

    return CompletableFuture.completedFuture(new WellNamedResultHolder())
            .thenCombineAsync(aFuture, WellNamedResultHolder::withAResult)
            .thenCombineAsync(bFuture, WellNamedResultHolder::withBResult)
            .thenCombineAsync(cFuture, WellNamedResultHolder::withCResult)
            .thenCombineAsync(dFuture, WellNamedResultHolder::withDResult)
            .thenApplyAsync(this::combineAllTheResults);
}

private static class WellNamedResultHolder {
    private A aResult;
    private B bResult;
    private C cResult;
    private D dResult;

    // Getters

    public WellNamedResultHolder withAResult(final A aResult) {
        this.aResult = aResult;
        return this;
    }

    public WellNamedResultHolder withBResult(final B bResult) {
        this.bResult = bResult;
        return this;
    }

    public WellNamedResultHolder withCResult(final C cResult) {
        this.cResult = cResult;
        return this;
    }

    public WellNamedResultHolder withDResult(final D dResult) {
        this.dResult = dResult;
        return this;
    }
}

The actual form of the result holder can obviously change to match your own needs, giving you greater flexibility. You also get to be in charge of what happens as these futures complete. Although there is more boilerplate, you get code that is more descriptive of what is happening (which lombok can tidy up).

Share:
36,820
Vladimir Korenev
Author by

Vladimir Korenev

Functional programming adept.

Updated on February 12, 2021

Comments

  • Vladimir Korenev
    Vladimir Korenev over 3 years

    If have 2 CompletionStages, I can combine them with thenCombine method:

    CompletionStage<A> aCompletionStage = getA();
    CompletionStage<B> bCompletionStage = getB();
    CompletionStage<Combined> combinedCompletionStage =
        aCompletionStage.thenCombine(bCompletionStage, (aData, bData) -> combine(aData, bData));
    

    If I have 3 or more CompletionStages, I can make a chain of thenCombine methods, but I have to use temporary objects to pass results. For example, here is a solution using Pair and Triple from the org.apache.commons.lang3.tuple package:

    CompletionStage<A> aCompletionStage = getA();
    CompletionStage<B> bCompletionStage = getB();
    CompletionStage<C> cCompletionStage = getC();
    CompletionStage<D> dCompletionStage = getD();
    
    CompletionStage<Combined> combinedDataCompletionStage =
            aCompletionStage.thenCombine(bCompletionStage, (Pair::of))
                    .thenCombine(cCompletionStage, (ab, c) ->
                            Triple.of(ab.getLeft(), ab.getRight(), c))
                    .thenCombine(dCompletionStage, (abc, d) ->
                            combine(abc.getLeft(), abc.getMiddle(), abc.getRight(), d));
    

    Is there a better way to combine results from multiple CompletionStages?

  • Arboreal Shark
    Arboreal Shark about 4 years
    That's not Java syntax dude