Split Rx Observable into multiple streams and process individually

27,897

Solution 1

You don't have to collapse Observables from groupBy. You can instead subscribe to them.

Something like this:

String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};

Action1<String> a = s -> System.out.print("-a-");

Action1<String> b = s -> System.out.print("-b-");

Action1<String> c = s -> System.out.print("-c-");

Observable
    .from(inputs)
    .groupBy(s -> s)
    .subscribe((g) -> {
        if ("a".equals(g.getKey())) {
            g.subscribe(a);
        }

        if ("b".equals(g.getKey())) {
            g.subscribe(b);
        }

        if ("c".equals(g.getKey())) {
            g.subscribe(c);
        }
    });

If statements look kinda ugly but at least you can handle each stream separately. Maybe there is a way of avoiding them.

Solution 2

Easy as pie, just use filter

An example in scala

import rx.lang.scala.Observable

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")

aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))

bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))

cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))

You just need to make sure that the source observable is hot. The easiest way is to share it.

Solution 3

In RxJava there is a special version of publish operator that takes a function.

ObservableTransformer {
  it.publish { shared ->
    Observable.merge(
        shared.ofType(x).compose(transformerherex),
        shared.ofType(y).compose(transformerherey)
    )
  }
}

This splits the event stream by type. Then you can process them separately by composing with different transformers. All of them share single subscription.

Solution 4

I have been thinking about this and Tomas solution is OK, but the issue is that it converts the stream to a hot observable.

You can use share in combination with defer in order to get a cold observable with other streams.

For example (Java):

var originalObservable = ...; // some source
var coldObservable = Observable.defer(() -> {
    var shared - originalObservable.share();
    var aSource = shared.filter(x -> x.equals("a"));
    var bSource = shared.filter(x -> x.equals("b"));
    var cSource = shared.filter(x -> x.equals("c"));
    // some logic for sources
    return shared;
});

Share:
27,897
Brandon Bil
Author by

Brandon Bil

Updated on June 16, 2020

Comments

  • Brandon Bil
    Brandon Bil almost 4 years

    Here is a picture of what I am attempting to accomplish.

    --a-b-c-a--bbb--a

    split into

    --a-----a-------a --> a stream

    ----b------bbb--- --> b stream

    ------c---------- --> c stream

    Then, be able to

    a.subscribe()
    b.subscribe()
    c.subscribe()
    

    So far, everything I have found has split the stream using a groupBy(), but then collapsed everything back into a single stream and process them all in the same function. What I want to do is process each derived stream in a different way.

    The way I'm doing it right now is doing a bunch of filters. Is there a better way to do this?

  • Brandon Bil
    Brandon Bil about 9 years
    Yeah, I would like to avoid having those ifs possibly. However, if it works out, then it'll look a bit cleaner since its all in one place, rather than doing filters on the original stream. Thanks!
  • ihuk
    ihuk about 9 years
    Cool! I'll update my answer if I figure out how to get rid of if statements.
  • tofi9
    tofi9 about 9 years
    You could use a Dictionary<groupKey,action>, and your group subscribe method would have to resolve the action from the dictionary and call it.
  • tofi9
    tofi9 about 9 years
    Something like Dictionary<String, Action1<String>> handlers = new Hashtable<>(); handlers.put("a", s -> System.out.println("-a-"));.... and .subscribe((g) -> { Action1<String> action = handlers.get(g.getKey()); g.subscribe(action); });
  • Tomáš Dvořák
    Tomáš Dvořák about 9 years
    Brandon Bill, I had to reread your question to figure out that you were originally using filter. Out of curiosity, what makes you prefer this groupBy solution to the filter one? To me filter seems simpler, easier to understand, probably better performing. What advantages do you see in groupBy?
  • Brandon Bil
    Brandon Bil almost 9 years
    I noticed some interesting things while debugging (note this is using Rx JS), where once a "group" had been associated with a handler, another instance of an element from the group would not go through the big subscribe function shown above. Instead it would forward directly to the Action that the group was subscribed to. Whereas, with a filter the comparisons get made for every single value of the source observable. Therefore, with N splits, a possible N comparisons will be made for every element. With groups, comparisons are only made when a new group is found.
  • bruce_ricard
    bruce_ricard over 8 years
    What if you want or initial observable to be cold?
  • Krzysztof Skrzynecki
    Krzysztof Skrzynecki almost 8 years
    @double_squeeze just use publish instead of share and invoke connect when all of subsribers are subscribed.
  • Yaroslav Stavnichiy
    Yaroslav Stavnichiy about 6 years
    There is no point in making it hot using share. Provided code actually subscribes three times - for each subscriber, same as it would without share. The right way to do it is described in Krzysztof Skyrzynecki's comment: use publish instead of share and invoke connect when all of subsribers are subscribed.
  • Tomáš Dvořák
    Tomáš Dvořák about 6 years
    I do agree that the publish+connect method is cleaner and I would use it whenever feasible (but sometimes, you just don't know the subscribers in advance and don't care that you miss some items). However, I don't think your statement about three subscriptions to the original cold o is correct. If you can prove otherwise, please share() :) your proof with us.