Merge results of array of Observables
Solution 1
I think that what you want is combineLatest
. It wont emit any value till all inner observables emit at least one value. After that it will emit the latest from all each time there is a new emit from one of the inner observables.
Here is some reading material: https://www.learnrxjs.io/operators/combination/combinelatest.html
Here is an example:
function getTransactionByID(transactionId) {
let count = 0;
return Rx.Observable.of(transactionId)
.delay(Math.random() * 4000)
.map(x => `${x}: ${count++} `)
.repeat();
}
function getTransactionsByIDs(transactionsIDs){
return Rx.Observable.combineLatest(transactionsIDs.map(transactionID => getTransactionByID(transactionID)));
}
const transactionsIDs = [1,2,3];
getTransactionsByIDs(transactionsIDs)
.take(10)
.subscribe(x => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>
The take(10)
is just the keep the example from going on forever.
Solution 2
It's not entirely clear what behavior you want, but you most likely want either forkJoin, zip, or combineLatest.
forkJoin
When all observables complete, emit an array containing the last emitted value from each.
zip
Subscribe to all inner observables, waiting for each to emit a value. Once this occurs, all values with the corresponding index will be emitted. This will continue until at least one inner observable completes.
combineLatest
When any observable emits a value, emit the latest value from each.
Example:
getTransactionsByIDs(transactionsIDs) {
const transactions = transactionIDs.map(transactionID => this.getTransactionByID(transactionID));
return Observable.forkJoin(...transactions); // change to zip or combineLatest if needed
}
this.transactionsService.getTransactionsByIDs(['1', '2', '3'])
.subscribe(([first, second, third]) => {
console.log({ first, second, third });
});
Gili Yaniv
Updated on July 09, 2022Comments
-
Gili Yaniv almost 2 years
I need to fetch IDs from my state as separate
Observable
s, combine all of them into oneObservable
andsubscribe()
to it. I need that my subscriber will eventually get an array of resolvedObservable
results.Also I need to keep this subscription open, so if one of my inner
Observable
s changes my subscriber will be notified.This is my code:
getTransactionsByIDs(transactionsIDs){ return Observable.of(transactionIDs .map(transactionID => this.getTransactionByID(transactionID))); } this.transactionsService.getTransactionsByIDs(transactionsIDs) .subscribe(transactions=>{ .... })
The result of the above code in the subscriber function is an array of unresolved stores.
How can I resolve each store and combine all of them together?
I've also tried to use
Observable.from()
on thetransactionsID
s to transform each ID into anObservable
and then resolve it. It works fine, but then my subscriber getting notified on every ID separately. If there's a way to batch all of theObservable.from()
results (while keeping subscription open), please let me know.This is how my
Observable.from()
looks like:getTransactionsByIDs(transactionsIDs){ return transactionIDs .mergeMap(transactionID => this.getTransactionByID(transactionID)); } this.transactionsService.getTransactionsByIDs(Observable.from(transactionsIDs)) .subscribe(transactions=>{ .... })
-
O-9 over 3 yearsimportant notice for combineLatest (from docs): -- will not emit an initial value until each observable emits at least one value
-
ttugates over 2 yearsMake sure the array you are spreading into any of the 3 operators has <= 6 items. I am actually searching for a solution that handles up to 15 or so without rolling my own grouping of observables.