Split Rx Observable into multiple streams and process individually
Solution 1
You don't have to collapse Observables
from groupBy
. You can instead subscribe to them.
Something like this:
String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};
Action1<String> a = s -> System.out.print("-a-");
Action1<String> b = s -> System.out.print("-b-");
Action1<String> c = s -> System.out.print("-c-");
Observable
.from(inputs)
.groupBy(s -> s)
.subscribe((g) -> {
if ("a".equals(g.getKey())) {
g.subscribe(a);
}
if ("b".equals(g.getKey())) {
g.subscribe(b);
}
if ("c".equals(g.getKey())) {
g.subscribe(c);
}
});
If statements look kinda ugly but at least you can handle each stream separately. Maybe there is a way of avoiding them.
Solution 2
Easy as pie, just use filter
An example in scala
import rx.lang.scala.Observable
val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")
aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))
bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))
cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))
You just need to make sure that the source observable is hot. The easiest way is to share
it.
Solution 3
In RxJava there is a special version of publish operator that takes a function.
ObservableTransformer {
it.publish { shared ->
Observable.merge(
shared.ofType(x).compose(transformerherex),
shared.ofType(y).compose(transformerherey)
)
}
}
This splits the event stream by type. Then you can process them separately by composing with different transformers. All of them share single subscription.
Solution 4
I have been thinking about this and Tomas solution is OK, but the issue is that it converts the stream to a hot observable.
You can use share
in combination with defer
in order to get a cold observable with other streams.
For example (Java):
var originalObservable = ...; // some source
var coldObservable = Observable.defer(() -> {
var shared - originalObservable.share();
var aSource = shared.filter(x -> x.equals("a"));
var bSource = shared.filter(x -> x.equals("b"));
var cSource = shared.filter(x -> x.equals("c"));
// some logic for sources
return shared;
});
Brandon Bil
Updated on June 16, 2020Comments
-
Brandon Bil almost 4 years
Here is a picture of what I am attempting to accomplish.
--a-b-c-a--bbb--a
split into
--a-----a-------a --> a stream
----b------bbb--- --> b stream
------c---------- --> c stream
Then, be able to
a.subscribe() b.subscribe() c.subscribe()
So far, everything I have found has split the stream using a groupBy(), but then collapsed everything back into a single stream and process them all in the same function. What I want to do is process each derived stream in a different way.
The way I'm doing it right now is doing a bunch of filters. Is there a better way to do this?
-
Brandon Bil about 9 yearsYeah, I would like to avoid having those ifs possibly. However, if it works out, then it'll look a bit cleaner since its all in one place, rather than doing filters on the original stream. Thanks!
-
ihuk about 9 yearsCool! I'll update my answer if I figure out how to get rid of
if
statements. -
tofi9 about 9 yearsYou could use a
Dictionary<groupKey,action>
, and your groupsubscribe
method would have to resolve the action from the dictionary and call it. -
tofi9 about 9 yearsSomething like
Dictionary<String, Action1<String>> handlers = new Hashtable<>(); handlers.put("a", s -> System.out.println("-a-"));....
and.subscribe((g) -> { Action1<String> action = handlers.get(g.getKey()); g.subscribe(action); });
-
Tomáš Dvořák about 9 yearsBrandon Bill, I had to reread your question to figure out that you were originally using
filter
. Out of curiosity, what makes you prefer thisgroupBy
solution to thefilter
one? To mefilter
seems simpler, easier to understand, probably better performing. What advantages do you see ingroupBy
? -
Brandon Bil almost 9 yearsI noticed some interesting things while debugging (note this is using Rx JS), where once a "group" had been associated with a handler, another instance of an element from the group would not go through the big subscribe function shown above. Instead it would forward directly to the Action that the group was subscribed to. Whereas, with a filter the comparisons get made for every single value of the source observable. Therefore, with N splits, a possible N comparisons will be made for every element. With groups, comparisons are only made when a new group is found.
-
bruce_ricard over 8 yearsWhat if you want or initial observable to be cold?
-
Krzysztof Skrzynecki almost 8 years@double_squeeze just use
publish
instead ofshare
and invokeconnect
when all of subsribers are subscribed. -
Yaroslav Stavnichiy about 6 yearsThere is no point in making it hot using
share
. Provided code actually subscribes three times - for each subscriber, same as it would withoutshare
. The right way to do it is described in Krzysztof Skyrzynecki's comment: usepublish
instead ofshare
and invokeconnect
when all of subsribers are subscribed. -
Tomáš Dvořák about 6 yearsI do agree that the
publish
+connect
method is cleaner and I would use it whenever feasible (but sometimes, you just don't know the subscribers in advance and don't care that you miss some items). However, I don't think your statement about three subscriptions to the original coldo
is correct. If you can prove otherwise, pleaseshare()
:) your proof with us.