Java executors: how to be notified, without blocking, when a task completes?
Solution 1
Define a callback interface to receive whatever parameters you want to pass along in the completion notification. Then invoke it at the end of the task.
You could even write a general wrapper for Runnable tasks, and submit these to ExecutorService
. Or, see below for a mechanism built into Java 8.
class CallbackTask implements Runnable {
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}
public void run() {
task.run();
callback.complete();
}
}
With CompletableFuture
, Java 8 included a more elaborate means to compose pipelines where processes can be completed asynchronously and conditionally. Here's a contrived but complete example of notification.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking {
public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}
void notify(String msg) {
System.out.println("Received message: " + msg);
}
}
class ExampleService {
String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}
public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}
public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}
}
Solution 2
In Java 8 you can use CompletableFuture. Here's an example I had in my code where I'm using it to fetch users from my user service, map them to my view objects and then update my view or show an error dialog (this is a GUI application):
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);
It executes asynchronously. I'm using two private methods: mapUsersToUserViews
and updateView
.
Solution 3
Use Guava's listenable future API and add a callback. Cf. from the website :
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
Solution 4
You could extend FutureTask
class, and override the done()
method, then add the FutureTask
object to the ExecutorService
, so the done()
method will invoke when the FutureTask
completed immediately.
Solution 5
ThreadPoolExecutor
also has beforeExecute
and afterExecute
hook methods that you can override and make use of. Here is the description from ThreadPoolExecutor
's Javadocs.
Hook methods
This class provides protected overridable
beforeExecute(java.lang.Thread, java.lang.Runnable)
andafterExecute(java.lang.Runnable, java.lang.Throwable)
methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializingThreadLocals
, gathering statistics, or adding log entries. Additionally, methodterminated()
can be overridden to perform any special processing that needs to be done once theExecutor
has fully terminated. If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.
Related videos on Youtube
Scott Cumming
Updated on January 29, 2020Comments
-
Scott Cumming about 4 years
Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:
- Take a task from the queue
- Submit it to the executor
- Call .get on the returned Future and block until a result is available
- Take another task from the queue...
However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.
What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)
How can I do that using JDK's java.util.concurrent, short of writing my own executor service?
(the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)
-
Scott Cumming almost 15 yearsThree answers in the blink of an eye! I like the CallbackTask, such a simple and straight forward solution. It looks obvious in retrospect. Thanks. Regarding others comments about SingleThreadedExecutor: I may have thousands of queues which may have thousands of tasks. Each of them need process their tasks one at a time, but different queues can operate in parallel. That's why I am using a single global threadpool. I'm new to executors so please tell me if I am mistaken.
-
Pierre-Henri over 11 yearsGood pattern, I would however use Guava's listenable future API that provide a very good implementation of it.
-
Bear over 9 yearsUsing SingleThreadExecutor what is the best way to know that all the threads have completed? I saw an examplethat uses a while !executor.isTerminated but this doesn't seem very elegant. I implemented a callback feature for each worker and increment a count which works.
-
takecare almost 9 yearsdoesn't this beat the purpose of using Future?
-
brady almost 9 years@takecare It's a non-blocking alternative to waiting on
get()
, if that's what you mean. -
Zelphir Kaltstahl about 8 years@erickson Could you specify, which
Callback
import it is? That would help a lot. There are so many, it's difficult to find. -
brady about 8 years@Zelphir It was a
Callback
interface that you declare; not from a library. Nowadays I'd probably just useRunnable
,Consumer
, orBiConsumer
, depending on what I need to pass back from the task to the listener. -
Bhargav over 7 yearsBut this callback is executed on the run() thread, not the thread in which the Runnable object was created, how do you get the callback to run on the thread in which the callback was created?
-
brady over 7 years@Bhargav This is typical of callbacks—an external entity "calls back" to the controlling entity. Do you want the thread that created the task to block until the task has finished? Then what purpose is there in running the task on a second thread? If you allow the thread to continue, it will need to repeatedly check some shared state (probably in a loop, but depends on your program) until it notices an update (boolean flag, new item in queue, etc.) made by the true callback as described in this answer. It can then perform some additional work.
-
Gary Gauh almost 7 years
then add the FutureTask object to the ExecutorService
, could you please tell me how to do this ? -
lin over 6 years@GaryGauh see this for more info you can extends FutureTask, we may call it MyFutureTask. Then use ExcutorService to submit MyFutureTask,then the run method of MyFutureTask will run,when MyFutureTask finished your done method will be called.Here something confusing is two FutureTask,and in fact MyFutureTask is a normal Runnable.
-
CRoemheld about 6 years@erickson The first solution with a callback is just beautiful. I needed this since I'm using an extended
ThreadPoolExecutor
class which also supports priorities for tasks. SinceCompletableFuture
uses theSupplier<V>
interface as parameter instead of theRunnable, V
orCallable<V>
ones, I was wondering how I could achieve this. I used aConsumer<V>
interface, though I extended it for additional methods. Absolutely works like a charm. -
kaiser about 6 yearsThis seems pretty nice.
-
user1767316 almost 4 yearsHow would one use a CompletableFuture with an executor ? (to limit the number of concurent/parallel instances) Would this be a hint: cf: submitting-futuretasks-to-an-executor-why-does-it-work ?