RxJS: concat three promises, distinguish results

14,538

Solution 1

If these are promises we are talking about, I think you can have a look at the forkJoin operator. Cf. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md, http://xgrommx.github.io/rx-book/content/observable/observable_methods/forkjoin.html

You could then have something like :

Rx.Observable.forkJoin(p1, p2, p3, function(p1,p2,p3){
/* your combining code here */
})

In short, forkJoin has semantics similar to that of RSVP.all if you know the promises library RSVP.

Solution 2

I made some experiments this morning, before going to see if I have an answer... :-)

You can see them at http://plnkr.co/edit/3Xczzw

I found out that concat of observables will run each observable in turn, in the given order.

  var o1 = Rx.Observable.interval(1500).take(1).map(function (i) { return { n: i, id: 'First', ts: Date.now() - reference }});
  var o2 = Rx.Observable.interval(1000).take(2).map(function (i) { return { n: i, id: 'Second', ts: Date.now() - reference }});
  var o3 = Rx.Observable.interval(2000).take(1).map(function (i) { return { n: i, id: 'Third', ts: Date.now() - reference }});
  // Alternative
  var oa = Rx.Observable.timer(1200).map(function (i) { return { n: i, id: 'Alternative', ts: Date.now() - reference }});

  Rx.Observable.concat(o1, o2, o3, oa).subscribe(
      function onNext(v) { v.timestamp = Date.now() - reference; showObject(v); },
      function onError(e) { var ts = Date.now() - reference; showHTML("Error " + JSON.stringify(e) + " at " + ts); },
      function onCompleted() { var ts = Date.now() - reference; showHTML("Completed at " + ts); }
  );

gives

{"n":0,"id":"First","ts":1503,"timestamp":1503}
{"n":0,"id":"Second","ts":2504,"timestamp":2504}
{"n":1,"id":"Second","ts":3505,"timestamp":3505}
{"n":0,"id":"Third","ts":5506,"timestamp":5506}
{"n":0,"id":"Alternative","ts":6708,"timestamp":6708}
Completed at 6708

A concat of promises won't deliver anything before the first promise resolves. Then it can deliver (ie. call onNext) the other resolved promises, still in the given order. Then can wait for next promise if any remains, etc.

  var p1 = promiseInTime(1500, { id: 'First'});
  var p2 = promiseInTime(1000, { id: 'Second' });
  var p3 = promiseInTime(2000, { id: 'Third' });
  var pa = promiseInTime(1200, { id: 'Failed? ' + !!withFailure }, withFailure);

  Rx.Observable.concat(p1, p2, p3, pa).subscribe(
      function onNext(v) { v.timestamp = Date.now() - reference; showObject(v); },
      function onError(e) { var ts = Date.now() - reference; showHTML("Error " + JSON.stringify(e) + " at " + ts); },
      function onCompleted() { var ts = Date.now() - reference; showHTML("Completed at " + ts); }
  );

gives

{"id":"First","promiseTimeout":1500,"timestamp":1501}
{"id":"Second","promiseTimeout":1000,"timestamp":1506}
{"id":"Third","promiseTimeout":2000,"timestamp":2001}
Error {"id":"Failed? true","promiseTimeout":1201} at 2004

or

{"id":"First","promiseTimeout":1500,"timestamp":1501}
{"id":"Second","promiseTimeout":1000,"timestamp":1503}
{"id":"Third","promiseTimeout":2000,"timestamp":2000}
{"id":"Failed? false","promiseTimeout":1201,"timestamp":2004}
Completed at 2004

So, basically, concat respects the order of its arguments, which can be a way to find back which request issued a promise result.
In the case of Ajax requests, better concat the promises than observables, as they will be requested in parallel, not sequentially (unless you need the latter, of course).

I tried the solution given by @user3743222, and it is a nice one. I prefer forkJoin, as the results are explicitly assigned to parameters, instead of relying on the order.

You have to be aware of differences in error management: I found out that concat will process all promises, until the first error found. While forkJoin will not process anything if one of the promises errors. Which makes sense (we cannot join partial results, in general).

Share:
14,538
PhiLho
Author by

PhiLho

Professional programmer for living... and for fun! Amateur artist (see my deviantART account if you are curious)... only for fun!

Updated on July 30, 2022

Comments

  • PhiLho
    PhiLho over 1 year

    I have three promises, Rest requests returning lists of data.
    The third one has references (ids) to the first two lists, so I want to map these ids to corresponding names when I have all the data.
    The mapping isn't an issue, I just use Lodash for that.
    But the issue is to wait for the three promises to resolve before starting to computing this mapping.

    I figured out to use concat():

    Rx.Observable.concat(p1, p2, p3).subscribe(
    function onNext(list)
    {
      // Assign the list to the corresponding variable in the scope
    },
    function onError(e)
    {
      // Notify of error
    },
    function onCompleted()
    {
      // Do the mapping
    }
    );
    

    My problem is that onNext() will be called in random order. Ie. I don't know which list I receive at some point, and it is hard to tell from the data it contains.

    Is there a way to track which promises produced which list? A kind of zip? concatMap? concatMapObserver? I admit I haven't fully understood the usage of the last two...

  • PhiLho
    PhiLho over 8 years
    Thanks. I found out how concat really works (see my answer), but I prefer forkJoin, which corresponds to what I wanted. I totally overlooked this one... :-)
  • user3743222
    user3743222 over 8 years
    Nice experiment. I would also add that with executing things sequentially, you get your results slower than if you execute them in parallel.
  • frandevel
    frandevel about 7 years
    If someone is trying this to work in Angular, it won't, as "complete" event is not triggered from the http observables. For a similar solution, it worked for me using "combineLatest" instead.