How to cancel Java 8 completable future?

23,006

Solution 1

Apparently, it's intentional. The Javadoc for the method CompletableFuture::cancel states:

[Parameters:] mayInterruptIfRunning - this value has no effect in this implementation because interrupts are not used to control processing.

Interestingly, the method ForkJoinTask::cancel uses almost the same wording for the parameter mayInterruptIfRunning.

I have a guess on this issue:

  • interruption is intended to be used with blocking operations, like sleep, wait or I/O operations,
  • but neither CompletableFuture nor ForkJoinTask are intended to be used with blocking operations.

Instead of blocking, a CompletableFuture should create a new CompletionStage, and cpu-bound tasks are a prerequisite for the fork-join model. So, using interruption with either of them would defeat their purpose. And on the other hand, it might increase complexity, that's not required if used as intended.

Solution 2

When you call CompletableFuture#cancel, you only stop the downstream part of the chain. Upstream part, i. e. something that will eventually call complete(...) or completeExceptionally(...), doesn't get any signal that the result is no more needed.

What are those 'upstream' and 'downstream' things?

Let's consider the following code:

CompletableFuture
        .supplyAsync(() -> "hello")               //1
        .thenApply(s -> s + " world!")            //2
        .thenAccept(s -> System.out.println(s));  //3

Here, the data flows from top to bottom - from being created by supplier, through being modified by function, to being consumed by println. The part above particular step is called upstream, and the part below is downstream. E. g. steps 1 and 2 are upstream for step 3.

Here's what happens behind the scenes. This is not precise, rather it's a convenient mind model of what's going on.

  1. Supplier (step 1) is being executed (inside the JVM's common ForkJoinPool).
  2. The result of the supplier is then being passed by complete(...) to the next CompletableFuture downstream.
  3. Upon receiving the result, that CompletableFuture invokes next step - a function (step 2) which takes in previous step result and returns something that will be passed further, to the downstream CompletableFuture's complete(...).
  4. Upon receiving the step 2 result, step 3 CompletableFuture invokes the consumer, System.out.println(s). After consumer is finished, the downstream CompletableFuture will receive it's value, (Void) null

As we can see, each CompletableFuture in this chain has to know who are there downstream waiting for the value to be passed to their's complete(...) (or completeExceptionally(...)). But the CompletableFuture don't have to know anything about it's upstream (or upstreams - there might be several).

Thus, calling cancel() upon step 3 doesn't abort steps 1 and 2, because there's no link from step 3 to step 2.

It is supposed that if you're using CompletableFuture then your steps are small enough so that there's no harm if a couple of extra steps will get executed.

If you want cancellation to be propagated upstream, you have two options:

  • Implement this yourself - create a dedicated CompletableFuture (name it like cancelled) which is checked after every step (something like step.applyToEither(cancelled, Function.identity()))
  • Use reactive stack like RxJava 2, ProjectReactor/Flux or Akka Streams

Solution 3

If you actually want to be able to cancel a task, then you have to use Future itself (e.g. as returned by ExecutorService.submit(Callable<T>), not CompletableFuture. As pointed out in the answer by nosid, CompletableFuture completely ignores any call to cancel(true).

My suspicion is that the JDK team did not implement interruption because:

  1. Interruption was always hacky, difficult for people to understand, and difficult to work with. The Java I/O system is not even interruptible, despite calls to InputStream.read() being blocking calls! (And the JDK team have no plans to make the standard I/O system interruptible again, like it was in the very early Java days.)
  2. The JDK team have been trying very hard to phase out old broken APIs from the early Java days, such as Object.finalize(), Object.wait(), Thread.stop(), etc. I believe Thread.interrupt() is considered to be in the category of things that must be eventually deprecated and replaced. Therefore, newer APIs (like ForkJoinPool and CompletableFuture) are already not supporting it.
  3. CompletableFuture was designed for building DAG-structured pipelines of operations, similar to the Java Stream API. It's very dificult to succinctly describe how interruption of one node of a dataflow DAG should affect execution in the rest of the DAG. (Should all concurrent tasks be canceled immediately, when any node is interrupted?)
  4. I suspect the JDK team just didn't want to deal with getting interruption right, given the levels of internal complexity that the JDK and libraries have reached these days. (The internals of the lambda system -- ugh.)

One very hacky way around this would be to have each CompletableFuture export a reference to itself to an externally-visible AtomicReference, then the Thread reference could be interrupted directly when needed from another external thread. Or if you start all the tasks using your own ExecutorService, in your own ThreadPool, you can manually interrupt any or all the threads that were started, even if CompletableFuture refuses to trigger interruption via cancel(true). (Note though that CompletableFuture lambdas cannot throw checked exceptions, so if you have an interruptible wait in a CompletableFuture, you'll have to re-throw as an unchecked exception.)

More simply, you could just declare an AtomicReference<Boolean> cancel = new AtomicReference<>() in an external scope, and periodically check this flag from inside each CompletableFuture task's lambda.

You could also try setting up a DAG of Future instances rather than a DAG of CompletableFuture instances, that way you can exactly specify how exceptions and interruption/cancellation in any one task should affect the other currently-running tasks. I show how to do this in my example code in my question here, and it works well, but it's a lot of boilerplate.

Solution 4

You need an alternative implementation of CompletionStage to accomplish true thread interruption. I've just released a small library that serves exactly this purpose - https://github.com/vsilaev/tascalate-concurrent

Solution 5

The call to wait will still block even if Future.cancel(..) is called. As mentioned by others the CompletableFuture will not use interrupts to cancel the task.

According to the javadoc of CompletableFuture.cancel(..):

mayInterruptIfRunning this value has no effect in this implementation because interrupts are not used to control processing.

Even if the implementation would cause an interrupt, you would still need a blocking operation in order to cancel the task or check the status via Thread.interrupted().

Instead of interrupting the Thread, which might not be always easy to do, you may have check points in your operation where you can gracefully terminate the current task. This can be done in a loop over some elements that will be processed or you check before each step of the operation for the cancel status and throw an CancellationException yourself.

The tricky part is to get a reference of the CompletableFuture within the task in order to call Future.isCancelled(). Here is an example of how it can be done:

public abstract class CancelableTask<T> {

    private CompletableFuture<T> task;

    private T run() {
        try {
            return compute();
        } catch (Throwable e) {
            task.completeExceptionally(e);
        }
        return null;
    }

    protected abstract T compute() throws Exception;

    protected boolean isCancelled() {
        Future<T> future = task;
        return future != null && future.isCancelled();
    }

    public Future<T> start() {
        synchronized (this) {
            if (task != null) throw new IllegalStateException("Task already started.");
            task = new CompletableFuture<>();
        }
        return task.completeAsync(this::run);
    }
}

Edit: Here the improved CancelableTask version as a static factory:

public static <T> CompletableFuture<T> supplyAsync(Function<Future<T>, T> operation) {
    CompletableFuture<T> future = new CompletableFuture<>();
    return future.completeAsync(() -> operation.apply(future));
}

here is the test method:

@Test
void testFuture() throws InterruptedException {
    CountDownLatch started = new CountDownLatch(1);
    CountDownLatch done = new CountDownLatch(1);

    AtomicInteger counter = new AtomicInteger();
    Future<Object> future = supplyAsync(task -> {
        started.countDown();
        while (!task.isCancelled()) {
            System.out.println("Count: " + counter.getAndIncrement());
        }
        System.out.println("Task cancelled");
        done.countDown();
        return null;
    });

    // wait until the task is started
    assertTrue(started.await(5, TimeUnit.SECONDS));
    future.cancel(true);
    System.out.println("Cancel called");

    assertTrue(future.isCancelled());

    assertTrue(future.isDone());
    assertTrue(done.await(5, TimeUnit.SECONDS));
}

If you really want to use interrupts in addition to the CompletableFuture, then you can pass a custom Executor to CompletableFuture.completeAsync(..) where you create your own Thread, override cancel(..) in the CompletableFuture and interrupt your Thread.

Share:
23,006
Lukas
Author by

Lukas

Updated on March 24, 2021

Comments

  • Lukas
    Lukas about 3 years

    I am playing with Java 8 completable futures. I have the following code:

    CountDownLatch waitLatch = new CountDownLatch(1);
    
    CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
        try {
            System.out.println("Wait");
            waitLatch.await(); //cancel should interrupt
            System.out.println("Done");
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
            throw new RuntimeException(e);
        }
    });
    
    sleep(10); //give it some time to start (ugly, but works)
    future.cancel(true);
    System.out.println("Cancel called");
    
    assertTrue(future.isCancelled());
    
    assertTrue(future.isDone());
    sleep(100); //give it some time to finish
    

    Using runAsync I schedule execution of a code that waits on a latch. Next I cancel the future, expecting an interrupted exception to be thrown inside. But it seems that the thread remains blocked on the await call and the InterruptedException is never thrown even though the future is canceled (assertions pass). An equivalent code using ExecutorService works as expected. Is it a bug in the CompletableFuture or in my example?

  • Ivan Hristov
    Ivan Hristov over 9 years
    Why do you think that CompletableFuture and ForkJoinTask are not intended to be used with blocking operations?
  • Kirill Gamazkov
    Kirill Gamazkov almost 7 years
    The whole point of CompletableFuture and other reactive stuff is: don't waste threads by making them wait for the result of some long operation. Instead, provide a callback to be called when the result is here. Reactive approach requires way less threads.
  • Kirill Gamazkov
    Kirill Gamazkov about 5 years
    My previous comment is not precisely correct. CompletableFuture allows for uniform syntax for both blocking and non-blocking code. That may be useful during migration of codebase from blocking style to non-blocking