How to compose Observables to avoid the given nested and dependent callbacks?
Solution 1
I'm the original author of the referenced blog post about callbacks and Java Futures. Here is an example of using flatMap, zip and merge to do service composition asynchronously.
It fetches a User object, then concurrently fetches Social and PersonalizedCatalog data, then for each Video from the PersonalizedCatalog concurrently fetches a Bookmark, Rating and Metadata, zips those together, and merges all of the responses into a progressive stream output as Server-Sent Events.
return getUser(userId).flatMap(user -> {
Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
.flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
video -> {
Observable<Bookmark> bookmark = getBookmark(video);
Observable<Rating> rating = getRatings(video);
Observable<VideoMetadata> metadata = getVideoMetadata(video);
return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
}));
Observable<Map<String, Object>> social = getSocial(user).map(s -> {
return s.getDataAsMap();
});
return Observable.merge(catalog, social);
}).flatMap(data -> {
String json = SimpleJson.mapToJson(data);
return response.writeStringAndFlush("data: " + json + "\n");
});
This example can be seen in context of a functioning application at https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33
Since I can't possibly provide all of the information here you can also find an explanation in presentation form (with link to video) at https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32.
Solution 2
According to your code. Suppose that the remote call are done using Observable
.
Observable<Integer> callRemoveServiceA() { /* async call */ }
/* .... */
Observable<Integer> callRemoveServiceE(Integer f2) { /* async call */ }
What you want :
- call
serviceA
then callserviceB
with the result ofserviceA
- call
serviceC
then callserviceD
andserviceE
with the result ofserviceC
- with the result of
serviceE
andserviceD
, build a new value - display the new value with the result of
serviceB
With RxJava, you'll achieve this with this code :
Observable<Integer> f3 = callRemoveServiceA() // call serviceA
// call serviceB with the result of serviceA
.flatMap((f1) -> callRemoveServiceB(f1));
Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC
// call serviceD and serviceE then build a new value
.flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5));
// compute the string to display from f3, and the f4, f5 pair
f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5)
// display the value
.subscribe(System.out::println);
the important part here is the use of flapMap
and zip
(or zipWith
)
-
flapMap
will transform a value into anotherObservable
. ThisObservable
here will be your new asynchronos call. ( http://reactivex.io/documentation/operators/flatmap.html ) -
zip
will compose a new value from two differentObservable
. So you can build a new value with the result of two (or more)Obsevable
( http://reactivex.io/documentation/operators/zip.html )
You can get more info on flapMap here : When do you use map vs flatMap in RxJava?
Related videos on Youtube
Aravind Yarram
Winner of Hackathon @ Kafka Summit 2016. 2nd Place, Neo4j Graph Gist Winter Challenge 2014. Data informed over data driven. Datasets over algorithms. Delivery over ceremonies. Metrics over anecdotes. Hypothesis over intuition. Evidence over experience. Trade-offs over trends. https://www.linkedin.com/in/aravindyarram
Updated on June 14, 2022Comments
-
Aravind Yarram almost 2 years
In this blog, he gives this (copy/pasted the following code) example for the callback hell. However, there is no mention of how the issue can be eliminated by using Reactive Extensions.
So here F3 depends upon F1 completion and F4 and F5 depend upon F2 completion.
- Wondering what would be the functional equivalent in Rx.
- How to represent in Rx that F1, F2, F3, F4 and F5 should all be pulled asynchronously?
NOTE: I am currently trying to wrap my head around Rx so I didn't try solving this example before asking this question.
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class CallbackB { /** * Demonstration of nested callbacks which then need to composes their responses together. * <p> * Various different approaches for composition can be done but eventually they end up relying upon * synchronization techniques such as the CountDownLatch used here or converge on callback design * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>. */ public static void run() throws Exception { final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); /* the following are used to synchronize and compose the asynchronous callbacks */ final CountDownLatch latch = new CountDownLatch(3); final AtomicReference<String> f3Value = new AtomicReference<String>(); final AtomicReference<Integer> f4Value = new AtomicReference<Integer>(); final AtomicReference<Integer> f5Value = new AtomicReference<Integer>(); try { // get f3 with dependent result from f1 executor.execute(new CallToRemoteServiceA(new Callback<String>() { @Override public void call(String f1) { executor.execute(new CallToRemoteServiceC(new Callback<String>() { @Override public void call(String f3) { // we have f1 and f3 now need to compose with others System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5")); // set to thread-safe variable accessible by external scope f3Value.set(f3); latch.countDown(); } }, f1)); } })); // get f4/f5 after dependency f2 completes executor.execute(new CallToRemoteServiceB(new Callback<Integer>() { @Override public void call(Integer f2) { executor.execute(new CallToRemoteServiceD(new Callback<Integer>() { @Override public void call(Integer f4) { // we have f2 and f4 now need to compose with others System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5")); // set to thread-safe variable accessible by external scope f4Value.set(f4); latch.countDown(); } }, f2)); executor.execute(new CallToRemoteServiceE(new Callback<Integer>() { @Override public void call(Integer f5) { // we have f2 and f5 now need to compose with others System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5)); // set to thread-safe variable accessible by external scope f5Value.set(f5); latch.countDown(); } }, f2)); } })); /* we must wait for all callbacks to complete */ latch.await(); System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get())); } finally { executor.shutdownNow(); } } public static void main(String[] args) { try { run(); } catch (Exception e) { e.printStackTrace(); } } private static final class CallToRemoteServiceA implements Runnable { private final Callback<String> callback; private CallToRemoteServiceA(Callback<String> callback) { this.callback = callback; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } callback.call("responseA"); } } private static final class CallToRemoteServiceB implements Runnable { private final Callback<Integer> callback; private CallToRemoteServiceB(Callback<Integer> callback) { this.callback = callback; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(40); } catch (InterruptedException e) { e.printStackTrace(); } callback.call(100); } } private static final class CallToRemoteServiceC implements Runnable { private final Callback<String> callback; private final String dependencyFromA; private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) { this.callback = callback; this.dependencyFromA = dependencyFromA; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); } callback.call("responseB_" + dependencyFromA); } } private static final class CallToRemoteServiceD implements Runnable { private final Callback<Integer> callback; private final Integer dependencyFromB; private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) { this.callback = callback; this.dependencyFromB = dependencyFromB; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(140); } catch (InterruptedException e) { e.printStackTrace(); } callback.call(40 + dependencyFromB); } } private static final class CallToRemoteServiceE implements Runnable { private final Callback<Integer> callback; private final Integer dependencyFromB; private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) { this.callback = callback; this.dependencyFromB = dependencyFromB; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(55); } catch (InterruptedException e) { e.printStackTrace(); } callback.call(5000 + dependencyFromB); } } private static interface Callback<T> { public void call(T value); } }
-
Nir Alfasi about 9 yearsI'm not an Rx master, but from a session that he gave: you can avoid the callback hell by composing Observables. If @benjchristensen is around - he might be able to provide more details.
-
Aravind Yarram about 9 years@alfasin I learnt enough of RxJava to know that it should be solved by composing observables. The question here is HOW this can be composed :-)
-
Nir Alfasi about 9 yearsIn that case - I would change the question from "how to avoid callback hell?" to "how to compose observables? " ;)
-
Aravind Yarram about 9 years@alfasin Re-worded the question...do you have an answer?
-
pt2121 about 9 years@Pangea Shouldn't it be how to compose observables?
-
Aravind Yarram about 9 years@EntryLevelDev yes. changed it. thank you.
-
Nir Alfasi about 9 yearsSorry that I had to bail out, but I see that you got your answer from the master himself - and I could definitely NOT top that! :)
-
Aravind Yarram about 9 yearsRWith this can I assume that, once subscribed, f4Andf5 can execute sooner than f3. Provided service A and B take longer? I ask because with futures, both service A and B should return before others can run.
-
dwursteisen about 9 yearsYes, if each call to remote service are async.
-
Aravind Yarram about 9 yearsThank you. I presume the getBookmark(...) etc implementations internally use subscribeOn(Schedulers.io()) right? And also, the zip operation runs on the calling thread.
-
benjchristensen about 9 yearsYes they can use subscribeOn with the IO scheduler to make blocking IO async. At Netflix we typically use Hystrix to take care of that for us and provide bulkheading, timeouts, fallbacks, metrics, etc. If the IO is nonblocking such as via Netty then subscribeOn is unnecessary.