How to combine multiple rxjs BehaviourSubjects

13,711

You could use the zip-operator, which works similar to combineLatest or forkJoin, but triggers only when both streams have emitted: http://reactivex.io/documentation/operators/zip.html

The difference between zip and combineLatest is: Zip will only trigger "in parallel", whereas combineLatest will trigger with any update and emit the latest value of each stream. So, assuming the following 2 streams:

streamA => 1--2--3
streamB => 10-20-30

with zip:

  • "1, 10"
  • "2, 20"
  • "3, 30"

with combineLatest:

  • "1, 10"
  • "2, 10"
  • "2, 20"
  • "3, 20"
  • "3, 30"

Here is also a live-example:

const a = new Rx.Subject();
const b = new Rx.Subject();

Rx.Observable.zip(a,b)
  .subscribe(x => console.log("zip: " + x.join(", ")));
Rx.Observable.combineLatest(a,b)
  .subscribe(x => console.log("combineLatest: " + x.join(", ")));

a.next(1);
b.next(10);
a.next(2);
b.next(20);
a.next(3);
b.next(30);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Also another sidenote: Never ever ever subscribe inside a subscribe. Do something like this instead:

this._http.get('a/Url/Here', { search: params })
            .map(response => <Cpm>response.json())
            .withLatestFrom(this.cpmSubject)
            .subscribe([_cpm, cpmList] => {
                if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
                    cpmList.push(_cpm);
                    this.cpmSubject.next(cpmList);
                }
            });
Share:
13,711
cobolstinks
Author by

cobolstinks

Software developer. I work primarily with .Net and Typescript/Javascript web applications. I enjoy Angular. I used to work on developing web services in java and before that i had the displeasure of working in COBOL, hence my avatar name. :(

Updated on June 20, 2022

Comments

  • cobolstinks
    cobolstinks almost 2 years

    I'm building out an Angular2 app, and have two BehaviourSubjects that I want to logically combine into one subscription. I'm making two http requests and want to fire an event when both of them come back. I'm looking at forkJoin vs combineLatest. It seems that combineLatest will fire when either behvaviorSubjects are updated vs forkJoin will fire only after all behavoirSubjects are updated. Is this correct? There has to be a generally accepted pattern for this isn't there?

    EDIT
    Here is an example of one of my behaviorSubjects my angular2 component is subscribing to:

    export class CpmService {
    
        public cpmSubject: BehaviorSubject<Cpm[]>;
    
        constructor(private _http: Http) {
            this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>());
        }
    
        getCpm(id: number): void {
            let params: URLSearchParams = new URLSearchParams();
            params.set('Id', id.toString());
    
            this._http.get('a/Url/Here', { search: params })
                .map(response => <Cpm>response.json())
                .subscribe(_cpm => {
                    this.cpmSubject.subscribe(cpmList => {
                        //double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres
                        if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
                            cpmList.push(_cpm);
                            this.cpmSubject.next(cpmList);
                        }
                    })
                });
        }
    }
    

    Here is a snippet of my component's subscription:

      this._cpmService.cpmSubject.subscribe(cpmList => {
          doSomeWork();
      });
    

    But instead of firing doSomeWork() on the single subscription I want to only fire doSomeWork() when the cpmSubject and fooSubject fire.

  • cobolstinks
    cobolstinks about 7 years
    What is the difference between zip and combineLatest?
  • cobolstinks
    cobolstinks about 7 years
    Thanks, for the detailed answer. I'm trying to try it out but I'm not finding a zip method on my Rx.Observable object.
  • olsn
    olsn about 7 years
    in case you have not imported the whole library, make sure to add it via import "rxjs/add/observable/zip"; - it does exist: github.com/ReactiveX/rxjs/blob/…