How to compose Observables to avoid the given nested and dependent callbacks?

15,075

Solution 1

I'm the original author of the referenced blog post about callbacks and Java Futures. Here is an example of using flatMap, zip and merge to do service composition asynchronously.

It fetches a User object, then concurrently fetches Social and PersonalizedCatalog data, then for each Video from the PersonalizedCatalog concurrently fetches a Bookmark, Rating and Metadata, zips those together, and merges all of the responses into a progressive stream output as Server-Sent Events.

return getUser(userId).flatMap(user -> {
    Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
            .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
                    video -> {
                        Observable<Bookmark> bookmark = getBookmark(video);
                        Observable<Rating> rating = getRatings(video);
                        Observable<VideoMetadata> metadata = getVideoMetadata(video);
                        return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
                    }));

    Observable<Map<String, Object>> social = getSocial(user).map(s -> {
        return s.getDataAsMap();
    });

    return Observable.merge(catalog, social);
}).flatMap(data -> {
    String json = SimpleJson.mapToJson(data);
    return response.writeStringAndFlush("data: " + json + "\n");
});

This example can be seen in context of a functioning application at https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33

Since I can't possibly provide all of the information here you can also find an explanation in presentation form (with link to video) at https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32.

Solution 2

According to your code. Suppose that the remote call are done using Observable.

 Observable<Integer>  callRemoveServiceA()  { /* async call */  }

/* .... */

Observable<Integer>  callRemoveServiceE(Integer f2) { /* async call */  }

What you want :

  • call serviceA then call serviceB with the result of serviceA
  • call serviceC then call serviceD and serviceE with the result of serviceC
  • with the result of serviceE and serviceD, build a new value
  • display the new value with the result of serviceB

With RxJava, you'll achieve this with this code :

Observable<Integer> f3 = callRemoveServiceA() // call serviceA
            // call serviceB with the result of serviceA
            .flatMap((f1) -> callRemoveServiceB(f1)); 


Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC
                    // call serviceD and serviceE then build a new value
                    .flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5));

// compute the string to display from f3, and the f4, f5 pair
f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5)
            // display the value
            .subscribe(System.out::println);

the important part here is the use of flapMap and zip (or zipWith)

You can get more info on flapMap here : When do you use map vs flatMap in RxJava?

Share:
15,075

Related videos on Youtube

Aravind Yarram
Author by

Aravind Yarram

Winner of Hackathon @ Kafka Summit 2016. 2nd Place, Neo4j Graph Gist Winter Challenge 2014. Data informed over data driven. Datasets over algorithms. Delivery over ceremonies. Metrics over anecdotes. Hypothesis over intuition. Evidence over experience. Trade-offs over trends. https://www.linkedin.com/in/aravindyarram

Updated on June 14, 2022

Comments

  • Aravind Yarram
    Aravind Yarram almost 2 years

    In this blog, he gives this (copy/pasted the following code) example for the callback hell. However, there is no mention of how the issue can be eliminated by using Reactive Extensions.

    So here F3 depends upon F1 completion and F4 and F5 depend upon F2 completion.

    1. Wondering what would be the functional equivalent in Rx.
    2. How to represent in Rx that F1, F2, F3, F4 and F5 should all be pulled asynchronously?

    NOTE: I am currently trying to wrap my head around Rx so I didn't try solving this example before asking this question.

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicReference;
    
    public class CallbackB {
    
        /**
         * Demonstration of nested callbacks which then need to composes their responses together.
         * <p>
         * Various different approaches for composition can be done but eventually they end up relying upon
         * synchronization techniques such as the CountDownLatch used here or converge on callback design
         * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
         */
        public static void run() throws Exception {
            final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
            /* the following are used to synchronize and compose the asynchronous callbacks */
            final CountDownLatch latch = new CountDownLatch(3);
            final AtomicReference<String> f3Value = new AtomicReference<String>();
            final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
            final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();
    
            try {
                // get f3 with dependent result from f1
                executor.execute(new CallToRemoteServiceA(new Callback<String>() {
    
                    @Override
                    public void call(String f1) {
                        executor.execute(new CallToRemoteServiceC(new Callback<String>() {
    
                            @Override
                            public void call(String f3) {
                                // we have f1 and f3 now need to compose with others
                                System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
                                // set to thread-safe variable accessible by external scope 
                                f3Value.set(f3);
                                latch.countDown();
                            }
    
                        }, f1));
                    }
    
                }));
    
                // get f4/f5 after dependency f2 completes 
                executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {
    
                    @Override
                    public void call(Integer f2) {
                        executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {
    
                            @Override
                            public void call(Integer f4) {
                                // we have f2 and f4 now need to compose with others
                                System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
                                // set to thread-safe variable accessible by external scope 
                                f4Value.set(f4);
                                latch.countDown();
                            }
    
                        }, f2));
                        executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {
    
                            @Override
                            public void call(Integer f5) {
                                // we have f2 and f5 now need to compose with others
                                System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
                                // set to thread-safe variable accessible by external scope 
                                f5Value.set(f5);
                                latch.countDown();
                            }
    
                        }, f2));
                    }
    
                }));
    
                /* we must wait for all callbacks to complete */
                latch.await();
                System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
            } finally {
                executor.shutdownNow();
            }
        }
    
        public static void main(String[] args) {
            try {
                run();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static final class CallToRemoteServiceA implements Runnable {
    
            private final Callback<String> callback;
    
            private CallToRemoteServiceA(Callback<String> callback) {
                this.callback = callback;
            }
    
            @Override
            public void run() {
                // simulate fetching data from remote service
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                callback.call("responseA");
            }
        }
    
        private static final class CallToRemoteServiceB implements Runnable {
    
            private final Callback<Integer> callback;
    
            private CallToRemoteServiceB(Callback<Integer> callback) {
                this.callback = callback;
            }
    
            @Override
            public void run() {
                // simulate fetching data from remote service
                try {
                    Thread.sleep(40);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                callback.call(100);
            }
        }
    
        private static final class CallToRemoteServiceC implements Runnable {
    
            private final Callback<String> callback;
            private final String dependencyFromA;
    
            private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
                this.callback = callback;
                this.dependencyFromA = dependencyFromA;
            }
    
            @Override
            public void run() {
                // simulate fetching data from remote service
                try {
                    Thread.sleep(60);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                callback.call("responseB_" + dependencyFromA);
            }
        }
    
        private static final class CallToRemoteServiceD implements Runnable {
    
            private final Callback<Integer> callback;
            private final Integer dependencyFromB;
    
            private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
                this.callback = callback;
                this.dependencyFromB = dependencyFromB;
            }
    
            @Override
            public void run() {
                // simulate fetching data from remote service
                try {
                    Thread.sleep(140);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                callback.call(40 + dependencyFromB);
            }
        }
    
        private static final class CallToRemoteServiceE implements Runnable {
    
            private final Callback<Integer> callback;
            private final Integer dependencyFromB;
    
            private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
                this.callback = callback;
                this.dependencyFromB = dependencyFromB;
            }
    
            @Override
            public void run() {
                // simulate fetching data from remote service
                try {
                    Thread.sleep(55);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                callback.call(5000 + dependencyFromB);
            }
        }
    
        private static interface Callback<T> {
            public void call(T value);
        }
    }
    
    • Nir Alfasi
      Nir Alfasi about 9 years
      I'm not an Rx master, but from a session that he gave: you can avoid the callback hell by composing Observables. If @benjchristensen is around - he might be able to provide more details.
    • Aravind Yarram
      Aravind Yarram about 9 years
      @alfasin I learnt enough of RxJava to know that it should be solved by composing observables. The question here is HOW this can be composed :-)
    • Nir Alfasi
      Nir Alfasi about 9 years
      In that case - I would change the question from "how to avoid callback hell?" to "how to compose observables? " ;)
    • Aravind Yarram
      Aravind Yarram about 9 years
      @alfasin Re-worded the question...do you have an answer?
    • pt2121
      pt2121 about 9 years
      @Pangea Shouldn't it be how to compose observables?
    • Aravind Yarram
      Aravind Yarram about 9 years
      @EntryLevelDev yes. changed it. thank you.
    • Nir Alfasi
      Nir Alfasi about 9 years
      Sorry that I had to bail out, but I see that you got your answer from the master himself - and I could definitely NOT top that! :)
  • Aravind Yarram
    Aravind Yarram about 9 years
    RWith this can I assume that, once subscribed, f4Andf5 can execute sooner than f3. Provided service A and B take longer? I ask because with futures, both service A and B should return before others can run.
  • dwursteisen
    dwursteisen about 9 years
    Yes, if each call to remote service are async.
  • Aravind Yarram
    Aravind Yarram about 9 years
    Thank you. I presume the getBookmark(...) etc implementations internally use subscribeOn(Schedulers.io()) right? And also, the zip operation runs on the calling thread.
  • benjchristensen
    benjchristensen about 9 years
    Yes they can use subscribeOn with the IO scheduler to make blocking IO async. At Netflix we typically use Hystrix to take care of that for us and provide bulkheading, timeouts, fallbacks, metrics, etc. If the IO is nonblocking such as via Netty then subscribeOn is unnecessary.