Java executors: how to be notified, without blocking, when a task completes?

152,447

Solution 1

Define a callback interface to receive whatever parameters you want to pass along in the completion notification. Then invoke it at the end of the task.

You could even write a general wrapper for Runnable tasks, and submit these to ExecutorService. Or, see below for a mechanism built into Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

With CompletableFuture, Java 8 included a more elaborate means to compose pipelines where processes can be completed asynchronously and conditionally. Here's a contrived but complete example of notification.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

Solution 2

In Java 8 you can use CompletableFuture. Here's an example I had in my code where I'm using it to fetch users from my user service, map them to my view objects and then update my view or show an error dialog (this is a GUI application):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

It executes asynchronously. I'm using two private methods: mapUsersToUserViews and updateView.

Solution 3

Use Guava's listenable future API and add a callback. Cf. from the website :

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

Solution 4

You could extend FutureTask class, and override the done() method, then add the FutureTask object to the ExecutorService, so the done() method will invoke when the FutureTask completed immediately.

Solution 5

ThreadPoolExecutor also has beforeExecute and afterExecute hook methods that you can override and make use of. Here is the description from ThreadPoolExecutor's Javadocs.

Hook methods

This class provides protected overridable beforeExecute(java.lang.Thread, java.lang.Runnable) and afterExecute(java.lang.Runnable, java.lang.Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated() can be overridden to perform any special processing that needs to be done once the Executor has fully terminated. If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.

Share:
152,447

Related videos on Youtube

Scott Cumming
Author by

Scott Cumming

Updated on January 29, 2020

Comments

  • Scott Cumming
    Scott Cumming about 4 years

    Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:

    1. Take a task from the queue
    2. Submit it to the executor
    3. Call .get on the returned Future and block until a result is available
    4. Take another task from the queue...

    However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.

    What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)

    How can I do that using JDK's java.util.concurrent, short of writing my own executor service?

    (the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)

  • Scott Cumming
    Scott Cumming almost 15 years
    Three answers in the blink of an eye! I like the CallbackTask, such a simple and straight forward solution. It looks obvious in retrospect. Thanks. Regarding others comments about SingleThreadedExecutor: I may have thousands of queues which may have thousands of tasks. Each of them need process their tasks one at a time, but different queues can operate in parallel. That's why I am using a single global threadpool. I'm new to executors so please tell me if I am mistaken.
  • Pierre-Henri
    Pierre-Henri over 11 years
    Good pattern, I would however use Guava's listenable future API that provide a very good implementation of it.
  • Bear
    Bear over 9 years
    Using SingleThreadExecutor what is the best way to know that all the threads have completed? I saw an examplethat uses a while !executor.isTerminated but this doesn't seem very elegant. I implemented a callback feature for each worker and increment a count which works.
  • takecare
    takecare almost 9 years
    doesn't this beat the purpose of using Future?
  • brady
    brady almost 9 years
    @takecare It's a non-blocking alternative to waiting on get(), if that's what you mean.
  • Zelphir Kaltstahl
    Zelphir Kaltstahl about 8 years
    @erickson Could you specify, which Callback import it is? That would help a lot. There are so many, it's difficult to find.
  • brady
    brady about 8 years
    @Zelphir It was a Callback interface that you declare; not from a library. Nowadays I'd probably just use Runnable, Consumer, or BiConsumer, depending on what I need to pass back from the task to the listener.
  • Bhargav
    Bhargav over 7 years
    But this callback is executed on the run() thread, not the thread in which the Runnable object was created, how do you get the callback to run on the thread in which the callback was created?
  • brady
    brady over 7 years
    @Bhargav This is typical of callbacks—an external entity "calls back" to the controlling entity. Do you want the thread that created the task to block until the task has finished? Then what purpose is there in running the task on a second thread? If you allow the thread to continue, it will need to repeatedly check some shared state (probably in a loop, but depends on your program) until it notices an update (boolean flag, new item in queue, etc.) made by the true callback as described in this answer. It can then perform some additional work.
  • Gary Gauh
    Gary Gauh almost 7 years
    then add the FutureTask object to the ExecutorService, could you please tell me how to do this ?
  • lin
    lin over 6 years
    @GaryGauh see this for more info you can extends FutureTask, we may call it MyFutureTask. Then use ExcutorService to submit MyFutureTask,then the run method of MyFutureTask will run,when MyFutureTask finished your done method will be called.Here something confusing is two FutureTask,and in fact MyFutureTask is a normal Runnable.
  • CRoemheld
    CRoemheld about 6 years
    @erickson The first solution with a callback is just beautiful. I needed this since I'm using an extended ThreadPoolExecutor class which also supports priorities for tasks. Since CompletableFuture uses the Supplier<V> interface as parameter instead of the Runnable, V or Callable<V> ones, I was wondering how I could achieve this. I used a Consumer<V> interface, though I extended it for additional methods. Absolutely works like a charm.
  • kaiser
    kaiser about 6 years
    This seems pretty nice.
  • user1767316
    user1767316 almost 4 years
    How would one use a CompletableFuture with an executor ? (to limit the number of concurent/parallel instances) Would this be a hint: cf: submitting-futuretasks-to-an-executor-why-does-it-work ?