Rxjs - How can I extract multiple values inside an array and feed them back to the observable stream synchronously

12,718

Solution 1

The family of xxxMap() operators all deal with higher-order Observables. Which means they allow you to create Observables inside the main Observable and inline the resulting values into the primary stream. So you could read the type signature as Observable<Observable<T>> => Observable<T>

Given a stream which every emission x is an Observable containing 4 value emissions:

input:  --x----------x 
flatMap   a-a-a-a-|  b-b-b-b-|
result: --a-a-a-a----b-b-b-b-|

Typecasting xxxMap(myFnc) return values

The xxxMap() operators all work with results of type Observable, Promise or Array. Depending on what you put into it will get converted to Observable if needed.

Rx.Observable.of('')
  .flatMap(() => [1,2,3,4])
  .subscribe(val => console.log('array value: ' + val));

Rx.Observable.of('')
  .flatMap(() => Promise.resolve(1))
  .subscribe(val => console.log('promise value: ' + val));

Rx.Observable.of('')
  .flatMap(() => Promise.resolve([1,2,3,4]))
  .subscribe(val => console.log('promise array value: ' + val));

Rx.Observable.of('')
  .flatMap(() => Rx.Observable.from([1,2,3,4]))
  .subscribe(val => console.log('Observable value: ' + val));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

In your case you can easily flatMap the objects and return the arrays:

Rx.Observable.of({ error: null, alternatives: ['result1', 'result2', 'result3'] })
  .flatMap(val => val.alternatives)
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

Difference between all xxxMap operators

What does mergeMap do

flatMap, better known as mergeMap will merge all emissions whenever they come into the main stream.

enter image description here

concatMap

concatMap will wait for all emissions to complete before concat the next stream:

enter image description here

switchMap

but switchMap will abandon the previous stream when a new emission is available and switch to emit values from the new stream:

enter image description here

Solution 2

You should pluck "alternatives" then to iterate over the array use from, as from creates a new observable you need to flatten up it, so use you need flatMap to do the job. So finally you have alternatives back into the stream.

try it:

var data = {
  error: null,
  alternatives: [{
    result: 1
  }, {
    result: 2
  }, {
    result: 3
  }]
}

var input$ = Rx.Observable.of(data);
input$.pluck('alternatives').flatMap(alternatives => Rx.Observable.from(alternatives)).subscribe(alternative => console.log(alternative));
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.umd.js"></script>

Solution 3

Operators flatMap() and concatMap() are both a good choice. You can just turn the alternatives property to another Observable and then emit the array items merged into the stream.

const Observable = Rx.Observable;

Observable.of({ error: null, alternatives: ['result1', 'result2', 'result3'] })
  .concatMap(val => {
    return Observable.from(val['alternatives']);
  })
  .subscribe(val => console.log(val));

This prints to console:

result1
result2
result3

See live demo: https://jsbin.com/foqutab/2/edit?js,console

Share:
12,718
lngs
Author by

lngs

Updated on July 06, 2022

Comments

  • lngs
    lngs almost 2 years

    I have created a Rx.Observable from a stream of events:

    Rx.Observable.fromEvent(recognizeStream, 'data')
    

    In which every data event looks like this:

    { error: null, alternatives: [result1, result2, result3] }
    

    I want to pluck every value inside the array of alternatives and merge those into the stream. What operators do I have to look at?

    As far as I know the flatMap and concatMap could do the job but I don't get the idea from their example.

    Can somebody explain which operator i should use and provide me with an example?

  • Mark van Straten
    Mark van Straten about 7 years
    The pluck operator is a bit redundant if you still need to use flatMap anyway.
  • lngs
    lngs about 7 years
    What is the purpose of adding return Observable.from(val['alternatives']) inside concatMap rather than return val.alternatives. I had tried it and still be able to subscribe to the observable as well.
  • martin
    martin about 7 years
    Returning Observable.from is universal and works with both RxJS 4 and RxJS 5. returning just an array works only in RxJS 5.
  • The Dembinski
    The Dembinski almost 7 years
    Your youtube link is broken :(
  • Mark van Straten
    Mark van Straten almost 7 years
    Sorry to hear, i have updated my answer to clarify a bit about higher-order observables instead of linking to the missing movie
  • NicBright
    NicBright over 6 years
    Am I right when guessing that concatMap(func) is the same as mergeMap(func, 1) (i.e. concurrency limited to 1)? ... Ah yes I'm right it's even mentioned in the docs :-), see reactivex.io/rxjs/class/es6/…
  • JJWesterkamp
    JJWesterkamp over 6 years
    A really nice write-up, way better than the docs. You saved my day :)