In which thread do CompletableFuture's completion handlers execute?
Solution 1
The policies as specified in the CompletableFuture
docs could help you understand better:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
All async methods without an explicit Executor argument are performed using the
ForkJoinPool.commonPool()
(unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interfaceCompletableFuture.AsynchronousCompletionTask
.
Update: I would also advice on reading this answer by @Mike as an interesting analysis further into the details of the documentation.
Solution 2
As @nullpointer points out, the documentation tells you what you need to know. However, the relevant text is surprisingly vague, and some of the comments (and answers) posted here seem to rely on assumptions that aren't supported by the documentation. Thus, I think it's worthwhile to pick it apart. Specifically, we should read this paragraph very carefully:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
Sounds straightforward enough, but it's light on details. It seemingly deliberately avoids describing when a dependent completion may be invoked on the completing thread versus during a call to a completion method like thenApply
. As written, the paragraph above is practically begging us to fill in the gaps with assumptions. That's dangerous, especially when the topic concerns concurrent and asynchronous programming, where many of the expectations we've developed as programmers get turned on their head. Let's take a careful look at what the documentation doesn't say.
The documentation does not claim that dependent completions registered before a call to complete()
will run on the completing thread. Moreover, while it states that a dependent completion might be invoked when calling a completion method like thenApply
, it does not state that a completion will be invoked on the thread that registers it (note the words "any other").
These are potentially important points for anyone using CompletableFuture
to schedule and compose tasks. Consider this sequence of events:
- Thread A registers a dependent completion via
f.thenApply(c1)
. - Some time later, Thread B calls
f.complete()
. - Around the same time, Thread C registers another dependent completion via
f.thenApply(c2)
.
Conceptually, complete()
does two things: it publishes the result of the future, and then it attempts to invoke dependent completions. Now, what happens if Thread C runs after the result value is posted, but before Thread B gets around to invoking c1
? Depending on the implementation, Thread C may see that f
has completed, and it may then invoke c1
and c2
. Alternatively, Thread C may invoke c2
while leaving Thread B to invoke c1
. The documentation does not rule out either possibility. With that in mind, here are assumptions that are not supported by the documentation:
- That a dependent completion
c
registered onf
prior to completion will be invoked during the call tof.complete()
; - That
c
will have run to completion by the timef.complete()
returns; - That dependent completions will be invoked in any particular order (e.g., order of registration);
- That dependent completions registered before
f
completes will be invoked before completions registered afterf
completes.
Consider another example:
- Thread A calls
f.complete()
; - Some time later, Thread B registers a completion via
f.thenApply(c1)
; - Around the same time, Thread C registers a separate completion via
f.thenApply(c2)
.
If it is known that f
has already run to completion, one might be tempted to assume that c1
will be invoked during f.thenApply(c1)
and that c2
will be invoked during f.thenApply(c2)
. One might further assume that c1
will have run to completion by the time f.thenApply(c1)
returns. However, the documentation does not support these assumptions. It may be possible that one of the threads calling thenApply
ends up invoking both c1
and c2
, while the other thread invokes neither.
A careful analysis of the JDK code could determine how the hypothetical scenarios above might play out. But even that is risky, because you may end up relying on an implementation detail that is (1) not portable, or (2) subject to change. Your best bet is not to assume anything that's not spelled out in the javadocs or the original JSR spec.
tldr: Be careful what you assume, and when you write documentation, be as clear and deliberate as possible. While brevity is a wonderful thing, be wary of the human tendency to fill in the gaps.
Solution 3
From the Javadoc:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
More concretely:
fn
will run during the call tocomplete()
in the context of whichever thread has calledcomplete()
.If
complete()
has already finished by the timethenApply()
is called,fn
will be run in the context of the thread callingthenApply()
.
Solution 4
When it comes to threading the API documentation is lacking. It takes a bit of inference to understand how threading and futures work. Start with one assumption: the non-Async
methods of CompletableFuture
do not spawn new threads on their own. Work will proceed under existing threads.
thenApply
will run in the original CompletableFuture
's thread. That's either the thread that calls complete()
, or the one that calls thenApply()
if the future is already completed. If you want control over the thread—a good idea if fn
is a slow operation—then you should use thenApplyAsync
.
Solution 5
I know this question is old, but I want to use source code to explain this question.
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e,
Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
Object r;
if ((r = result) != null)
return uniAcceptNow(r, e, f);
CompletableFuture<Void> d = newIncompleteFuture();
unipush(new UniAccept<T>(e, d, this, f));
return d;
}
This is the source code from java 16, and we can see, if we trigger thenAccept, we will pass a null executor service reference into our function. From the 2nd function uniAcceptStage() 2nd if condition. If result is not null, it will trigger uniAcceptNow()
if (e != null) {
e.execute(new UniAccept<T>(null, d, this, f));
} else {
@SuppressWarnings("unchecked") T t = (T) r;
f.accept(t);
d.result = NIL;
}
if executor service is null, we will use lambda function f.accept(t) to execute it. If we are triggering this thenApply/thenAccept from main thread, it will use main thread as executing thread.
But if we cannot get previous result from last completablefuture, we will push our current UniAccept/Apply into stack by using uniPush function. And UniAccept class has tryFire() which will be triggered from our postComplete() function
final void postComplete() {
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (STACK.compareAndSet(f, h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
NEXT.compareAndSet(h, t, null); // try to detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
Related videos on Youtube
St.Antario
Updated on June 17, 2022Comments
-
St.Antario almost 2 years
I have a question about CompletableFuture method:
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
The thing is the JavaDoc says just this:
Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion.
What about threading? In which thread is this going to be executed? What if the future is completed by a thread pool?
-
St.Antario over 6 yearsNot quite clear original thread. What if the the future is completed by a standalone thread pool? For instance, we execute some computation in the pool and when it's finished just call
CompletableFuture::complete
. -
Boris the Spider over 6 yearsAlso note the corner case where the
CompletableFuture
completes before thethenApply
call returns - in this case, because theCompletableFuture
is completed; it will execute on the current thread. -
Boris the Spider over 6 yearsInteresting analysis - really digs into the intricacies of implementation promises in the realm of concurrent programming.
-
Mike Strobel over 6 years@BoristheSpider Thanks! I found the documentation to be carefully worded yet curiously vague, which sent me into "paranoid analysis" mode. I was also amused that the accepted answer made no attempt to decipher or otherwise expand upon the quoted Javadocs, which was probably wise :).
-
Holger over 6 yearsIt seems that in the past, I should have asked myself what “completion method” actually means when I read that documentation. “A careful analysis of the JDK code” leads to the conclusion that most of the surprising scenarios you describe, are indeed possible. So the risk to rely on implementation details is rather low. The fact that two independent actions have no ordering, hence, are not executed in the order they were registered, has been discussed here already, though that didn’t even require the more surprising scenarios you describe.
-
Mike Strobel over 6 years@Holger I rather dislike the way they use 'completion' to describe a task that runs upon completion of its antecedent. Because that word appears rather often when discussing futures ("completion of", "having run to completion", etc.), it's easy to gloss over or misinterpret it in contexts like that javadoc excerpt. I would have preferred they use 'continuation' instead.
-
Holger over 6 yearsYeah, when I read it the first time, I thought that “a completion method” meant either of
complete
,completeExceptionally
,cancel
, orobtrude…
as these complete rather than chain or define or continuation… -
Mike Strobel over 6 years@Holger I think it generally refers to those, and concurrent calls to
complete
andcancel
, for example, imply a similar race to the one I described betweencomplete
andthenApply
. It's possible that "completion method" in the quoted excerpt was not meant to directly include chaining methods likethenApply
. However, those chaining methods lead totryFire
being called, and the code comments explicitly identifytryFire
as a "completion method", so they would still be transitively covered. The more I look at this, the more I think the documentation is just terrible. -
Gili about 6 yearsI don't fully understand your answer. I appreciate you highlighting what the specification does not guarantee, but can you please clarify the other side of the coin as well? Specifically, can you provide examples of which threads are allowed to execute non-async completions? I am having a hard time imagining what these can be. Thank you.
-
1283822 almost 6 yearsThat is the point of the answer. The doc outlined a few choices the implementation can freely choose from wherever it sees fit: any thread that registers the thenApply, or thread that completes the completionStage. You can't assume what the implementation does.
-
1283822 almost 6 yearsIf you really need to specify which thread to run the thenApply, then use thenApplyAsync. It is the same thing except you can control which executor to use.
-
phant0m over 5 years
or by any other caller of a completion method
does this even include callers of such methods on other, unrelated CompletableFuture objects? -
Holger about 5 years@phant0m no, it does not apply to entirely unrelated futures.
-
Yulin over 4 yearsFor methods like
thenApply
,thenRun
what's expained in the doc is clear engough. But what aboutallOf
, for something likefa = CompletableFuture.allOf(f0, f1, f2); fa.thenRun(someRunnable)
, assumef0
,f1
,f2
are completed in thread A, thread B, thread C respectively, then which thread willsomeRunnable
be executed in? Again, what aboutthenCompose(Function<? super T,? extends CompletionStage<U>> fn)
in case likef0.thenCompose(x -> someNewCompletionStageProducer).thenRun(someRunnable)
,someRunnable
will be executed in thread off0
or the future returned byfn
? @Naman