Merge results of array of Observables

28,023

Solution 1

I think that what you want is combineLatest. It wont emit any value till all inner observables emit at least one value. After that it will emit the latest from all each time there is a new emit from one of the inner observables.

Here is some reading material: https://www.learnrxjs.io/operators/combination/combinelatest.html

Here is an example:

function getTransactionByID(transactionId) {
  let count = 0;
  return Rx.Observable.of(transactionId)
    .delay(Math.random() * 4000)
    .map(x => `${x}: ${count++} `)
    .repeat();
}

function getTransactionsByIDs(transactionsIDs){
  return Rx.Observable.combineLatest(transactionsIDs.map(transactionID => getTransactionByID(transactionID)));
}

const transactionsIDs = [1,2,3];
getTransactionsByIDs(transactionsIDs)
  .take(10)
  .subscribe(x => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>

The take(10) is just the keep the example from going on forever.

Solution 2

It's not entirely clear what behavior you want, but you most likely want either forkJoin, zip, or combineLatest.

forkJoin

When all observables complete, emit an array containing the last emitted value from each.

zip

Subscribe to all inner observables, waiting for each to emit a value. Once this occurs, all values with the corresponding index will be emitted. This will continue until at least one inner observable completes.

combineLatest

When any observable emits a value, emit the latest value from each.


Example:

getTransactionsByIDs(transactionsIDs) {
  const transactions = transactionIDs.map(transactionID => this.getTransactionByID(transactionID));
  return Observable.forkJoin(...transactions); // change to zip or combineLatest if needed
}


this.transactionsService.getTransactionsByIDs(['1', '2', '3'])
  .subscribe(([first, second, third]) => {
    console.log({ first, second, third });
  });
Share:
28,023
Gili Yaniv
Author by

Gili Yaniv

Updated on July 09, 2022

Comments

  • Gili Yaniv
    Gili Yaniv almost 2 years

    I need to fetch IDs from my state as separate Observables, combine all of them into one Observable and subscribe() to it. I need that my subscriber will eventually get an array of resolved Observable results.

    Also I need to keep this subscription open, so if one of my inner Observables changes my subscriber will be notified.

    This is my code:

    getTransactionsByIDs(transactionsIDs){
    return Observable.of(transactionIDs
      .map(transactionID => this.getTransactionByID(transactionID)));
    }
    
    this.transactionsService.getTransactionsByIDs(transactionsIDs)
    .subscribe(transactions=>{
     ....
    })
    

    The result of the above code in the subscriber function is an array of unresolved stores.

    How can I resolve each store and combine all of them together?

    I've also tried to use Observable.from() on the transactionsIDs to transform each ID into an Observable and then resolve it. It works fine, but then my subscriber getting notified on every ID separately. If there's a way to batch all of the Observable.from() results (while keeping subscription open), please let me know.

    This is how my Observable.from() looks like:

    getTransactionsByIDs(transactionsIDs){
    return transactionIDs
      .mergeMap(transactionID => this.getTransactionByID(transactionID));
    }
    
    this.transactionsService.getTransactionsByIDs(Observable.from(transactionsIDs))
    .subscribe(transactions=>{
     ....
    })
    
  • O-9
    O-9 over 3 years
    important notice for combineLatest (from docs): -- will not emit an initial value until each observable emits at least one value
  • ttugates
    ttugates over 2 years
    Make sure the array you are spreading into any of the 3 operators has <= 6 items. I am actually searching for a solution that handles up to 15 or so without rolling my own grouping of observables.