Wrapping an asynchronous computation into a synchronous (blocking) computation

28,809

Solution 1

Using your own Future implemenation:

public class BazComputationFuture implements Future<Baz>, BazComputationSink {

    private volatile Baz result = null;
    private volatile boolean cancelled = false;
    private final CountDownLatch countDownLatch;

    public BazComputationFuture() {
        countDownLatch = new CountDownLatch(1);
    }

    @Override
    public boolean cancel(final boolean mayInterruptIfRunning) {
        if (isDone()) {
            return false;
        } else {
            countDownLatch.countDown();
            cancelled = true;
            return !isDone();
        }
    }

    @Override
    public Baz get() throws InterruptedException, ExecutionException {
        countDownLatch.await();
        return result;
    }

    @Override
    public Baz get(final long timeout, final TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        countDownLatch.await(timeout, unit);
        return result;
    }

    @Override
    public boolean isCancelled() {
        return cancelled;
    }

    @Override
    public boolean isDone() {
        return countDownLatch.getCount() == 0;
    }

    public void onBazResult(final Baz result) {
        this.result = result;
        countDownLatch.countDown();
    }

}

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    BazComputationFuture future = new BazComputationFuture();
    doSomethingAsync(fooArg, barArg, future);
    return future;
}

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    return doSomething(fooArg, barArg).get();
}

The solution creates a CountDownLatch internally which is cleared once the callback is received. If the user calls get, the CountDownLatch is used to block the calling thread until the computation completes and call the onBazResult callback. The CountDownLatch will assure that if the callback occurs before get() is called the get() method will return immediately with a result.

Solution 2

Well, there is the simple solution of doing something like:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
  final AtomicReference<Baz> notifier = new AtomicReference();
  doSomethingAsync(fooArg, barArg, new BazComputationSink() {
    public void onBazResult(Baz result) {
      synchronized (notifier) {
        notifier.set(result);
        notifier.notify();
      }
    }
  });
  synchronized (notifier) {
    while (notifier.get() == null)
      notifier.wait();
  }
  return notifier.get();
}

Of course, this assumes that your Baz result will never be null…

Solution 3

The google guava library has an easy to use SettableFuture that makes this problem very simple (around 10 lines of code).

public class ImplementingThing {

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    try {
        return doSomething(fooArg, barArg).get();
    } catch (Exception e) {
        throw new RuntimeException("Oh dear");
    }
};

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    final SettableFuture<Baz> future = new SettableFuture<Baz>();
    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
        @Override
        public void onBazResult(Baz result) {
            future.set(result);
        }
    });
    return future;
};

// Everything below here is just mock stuff to make the example work,
// so you can copy it into your IDE and see it run.

public static class Baz {}
public static class Foo {}
public static class Bar {}

public static interface BazComputationSink {
    public void onBazResult(Baz result);
}

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Baz baz = new Baz();
            sink.onBazResult(baz);
        }
    }).start();
};

public static void main(String[] args) {
    System.err.println("Starting Main");
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
    System.err.println("Ending Main");
}

Solution 4

This is dead simple with RxJava 2.x:

try {
    Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
            doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
            .toFuture().get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

Or without Lambda notation:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
                @Override
                public void subscribe(SingleEmitter<Baz> emitter) {
                    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                        @Override
                        public void onBazResult(Baz result) {
                            emitter.onSuccess(result);
                        }
                    });
                }
            }).toFuture().get();

Even simpler:

Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
                doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
                .blockingGet();

Kotlin Version:

val baz = Single.create<Baz> { emitter -> 
    doSomethingAsync(fooArg, barArg) { result -> emitter.onSuccess(result) } 
}.blockingGet()

Solution 5

A very simple example, just to understand CountDownLatch without any extra code.

A java.util.concurrent.CountDownLatch is a concurrency construct that allows one or more threads to wait for a given set of operations to complete.

A CountDownLatch is initialized with a given count. This count is decremented by calls to the countDown() method. Threads waiting for this count to reach zero can call one of the await() methods. Calling await() blocks the thread until the count reaches zero.

Below is a simple example. After the Decrementer has called countDown() 3 times on the CountDownLatch, the waiting Waiter is released from the await() call.

You can also mention some TimeOut to await.

CountDownLatch latch = new CountDownLatch(3);

Waiter      waiter      = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter)     .start();
new Thread(decrementer).start();

Thread.sleep(4000);
public class Waiter implements Runnable{

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

//--------------

public class Decrementer implements Runnable {

    CountDownLatch latch = null;

    public Decrementer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Reference

If you don't want to use a CountDownLatch or your requirement is something same as Facebook like and unlike functionality. Means if one method is being called then don't call the other method.

In that case you can declare a

private volatile Boolean isInprocessOfLikeOrUnLike = false;

and then you can check in the beginning of your method call that if it is false then call method otherwise return.. depends upon your implementation.

Share:
28,809
Jason S
Author by

Jason S

Updated on July 09, 2022

Comments

  • Jason S
    Jason S almost 2 years

    similar questions:

    I have an object with a method I would like to expose to library clients (especially scripting clients) as something like:

    interface MyNiceInterface
    {
        public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
        public Future<Baz> doSomething(Foo fooArg, Bar barArg);
        // doSomethingAndBlock is the straightforward way;
        // doSomething has more control but deals with
        // a Future and that might be too much hassle for
        // scripting clients
    }
    

    but the primitive "stuff" I have available is a set of event-driven classes:

    interface BazComputationSink
    {
        public void onBazResult(Baz result);
    }
    
    class ImplementingThing
    {
        public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
    }
    

    where ImplementingThing takes inputs, does some arcane stuff like enqueueing things on a task queue, and then later when a result occurs, sink.onBazResult() gets called on a thread that may or may not be the same thread as ImplementingThing.doSomethingAsync() was called.

    Is there a way I can use the event-driven functions I have, along with concurrency primitives, to implement MyNiceInterface so scripting clients can happily wait on a blocking thread?

    edit: can I use FutureTask for this?