RXJS Combining multiple observables inside a pipe

17,744

Solution 1

This is what you can do:

sourceObservable$.pipe(
  // depends on your need here you can use mergeMap as well
  switchMap(ids => {
    const observables = ids.map(i => this.service.getSomeStuff(id));
    return forkJoin(observables);
  }),
  tap(joined => {
    // joined will be an array of values of the observables in the same
    // order as you pushed in forkJoin
    for (data of joined) {
      // do something
    }
  }),
  takeWhile(() => this.componentActive),
  catchError(error => {
    console.log(error);
    return throwError(error);
  })
)
.subscribe();

Solution 2

combineLatest(...observables)

will only emit after all observables have emitted and you will have an array of the results.

Solution 3

Instead of modifying your data at once using for, why don't you do it as the data is received? Something like this.

source$.pipe(
  mergeMap(ids => from(ids)),
  mergeMap(id => this.service.getSomeStuff(id)),
  tap(data => //do someting with the data);
  takeWhile(() => this.componentActive),
  catchError(error => {
    console.log(error);
    return throwError(error);
  }),
  toArray(), --> this create the array of all the values received.
)
.subscribe(data => //returns array of modified data);
Share:
17,744

Related videos on Youtube

BartKrul
Author by

BartKrul

Student Programmer

Updated on June 15, 2022

Comments

  • BartKrul
    BartKrul almost 2 years

    I have an API call that returns a certain amount of ids. Each of these ids are used to make a new api call. The results of these API calls need to be combined into a single object.

    At first I used a loop inside the .pipe(map) operator of the first api call. In this loop I did the second api calls, and in the .pipe(map) operator in each of these calls I would edit a variable in my angular component.

    This wasn't very pretty, and I was actually wondering if this is thread safe. I know javascript is single threaded, but it doesn't seem very safe to have multiple asynchronous processes messing with the same global variable.

    after that I just stored the observable returned by the second api call in an array by looping over the returned Ids by apiCall1, and used forkJoin to subscribe and handle each result accordingly (see example).

    This isn't very pretty however, and I was wondering if there's an operator I can use in my pipe for this?

    So instead of (pseudocode):

      .pipe(
          map(ids=> {
    
            let observables = []
            for (const id of ids) {
             observables.push(this.service.getSomeStuff(id));
            }
    
            forkJoin(...observables).subscribe(dataArray) => {
              for (data of dataArray) {
                //Do something
              }
            });
    
          }),
          takeWhile(() => this.componentActive),
          catchError(error => {
            console.log(error);
            return throwError(error);
          })
        )
        .subscribe();
    

    Is there an operator that makes it something like this:

      .pipe(
          map(ids=> {
    
            let observables = []
            for (const id of ids) {
             observables.push(this.service.getSomeStuff(id));
            }
    
          return observables
          }),
          forkJoin(dataArray => {
              for (data of dataArray) {
                //Do something
              }
            });
          takeWhile(() => this.componentActive),
          catchError(error => {
            console.log(error);
            return throwError(error);
          })
        )
        .subscribe();
    
  • Brandon
    Brandon almost 5 years
    it would be better to eliminate the tap and move that code into the call to subscribe.
  • user2216584
    user2216584 almost 5 years
    @Brandon Yes, that's also the same thing. It is just to demonstrate what can be done in the observable pipeline. Personally, In Angular app, I rarely subscribe. I heavily use async pipe to let angular take care of subscription management itself. Although in subscribe() method also its absolutely fine. BTW thanks for the suggestion..:)
  • Davy
    Davy almost 5 years
    I agree with Brandon, there are rarely good reasons why you should not use pure functions
  • user2216584
    user2216584 almost 5 years
    @Davy using tap operator callback is also a pure function. Again as I said - using subscribe() is also absolutely fine.