RxJS Promise Composition (passing data)

40,457

I did not read all of it, but if you want to achieve the same as pl().then(p2).then(p3).then(console.log);, with p being function returning promises, you could do something like (example here)

Rx.Observable.fromPromise(p1())
             .flatMap(function(p1_result){return p2(p1_result);})
             .flatMap(function(p2_result){return p3(p2_result);})

Or the more symmetric :

 var chainedPromises$ = 
     Rx.Observable.just()
             .flatMap(p1)
             .flatMap(p2)
             .flatMap(p3);

Now if you want to execute sequentially callback wrapped through fromCallback or fromNodeCallback, you could do something like :

function rename (flag){
  return flag
          ? rename(file,dest).flatMap(return Rx.Observable.just(dest))
          : Rx.Observable.throw(new Error('File does not exist.'));
}

Rx.Observable.just(file)
             .flatMap(exists)
             .flatMap(rename)
             .flatMap(stat)

The latter code is untested, so keep me updated if that works. Last comment, this should work if at each point you only have one value produced (like a promise). If you would have several files instead of one, with flatMap you might get ordering issues (if order matters to you), so in that case, you could use concatMap as a replacement.

Share:
40,457
low_ghost
Author by

low_ghost

Updated on February 03, 2020

Comments

  • low_ghost
    low_ghost about 4 years

    I'm brand new to Rx and am finding it difficult to find documentation on composing promises such that data from the first promise is passed into the second and so on. Here's three very basic promises, the calculations on the data aren't important, just that something async has to be done using data from the previous promise.

     const p1 = () => Promise.resolve(1);
     const p2 = x => { const val = x + 1; return Promise.resolve(val); };
     const p3 = x => {
          const isEven = x => x % 2 === 0;
          return Promise.resolve(isEven(x));
     };
    

    The traditional way to achieve the composition I'm talking about:

     pl().then(p2).then(p3).then(console.log);
    

    My favorite implementation is Ramda's composeP and pipeP:

    R.pipeP(p1, p2, p3, console.log)()
    

    It seems likely Rx might be able to handle this kind of situation pretty fluently. However, the closest I've found so far is from the RxJS to async (library) comparison here https://github.com/Reactive-Extensions/RxJS/blob/master/doc/mapping/async/comparing.md:

     var Rx = require('rx'),
         fs = require('fs'),
         path = require('path');
     var file = path.join(__dirname, 'file.txt'),
         dest = path.join(__dirname, 'file1.txt'),
         exists = Rx.Observable.fromCallback(fs.exists),
         rename = Rx.Observable.fromNodeCallback(fs.rename),
         stat = Rx.Observable.fromNodeCallback(fs.stat);
     exists(file)
        .concatMap(function (flag) {
         return flag ?
             rename(file, dest) :
             Rx.Observable.throw(new Error('File does not exist.'));
        })
        .concatMap(function () {
            return stat(dest);
        })
       .forEach(
          function (fsStat) {
              console.log(JSON.stringify(fsStat));
          },
          function (err) {
              console.log(err);
          }
        );
    

    concatMap seems promising, but the above code looks pretty horrific. I was also having trouble with my example because Rx.Observable.fromPromise(p1) won't work as it expects a promise itself, not a function, and Rx.Observable.defer(p1) doesn't seem to pass parameters like the example.

    Thanks!

    Similar question but without data passing: Chaining promises with RxJS

  • low_ghost
    low_ghost over 8 years
    I was somewhat hoping for just a slightly higher abstraction that would be something like flatMapAll(p1, p2, p3). Particularly helpful if a sequence of promises is generated via a map e.g. const ps = map((x) => promisedFsReadFileCurriedSoThatItDoesSomethingWithPreviousFi‌​leData(x), ['1.txt','2.txt','3.txt']); Rx.Observable.just().flatMapAll(...ps); (just pseudo code). But this is definitely a manageable solution and likely there's a way to do this with mapping over fromPromise or something. thanks!
  • low_ghost
    low_ghost over 8 years
    also didn't test the second code sample either, but the first ones work like a charm
  • user3743222
    user3743222 over 8 years
    you can do flatMapAll yourself. flatMapAll :: Rx.Observable -> [a ->a] -> Rx.Observable. flatMapAll = (source, fn_array) -> fn_array.reduce((acc, fn) -> acc.flatMap(fn), source). In js, Rx.Observable.prototype.flatMapAll = function (fn_array) {source = this; return ...}
  • low_ghost
    low_ghost over 8 years
    yep, was coming to something similar. here's what I wrote const flatMapAll = (...fns) => fns.reduce((acc, fn) => acc.flatMap(fn), Rx.Observable.just()); flatMapAll(p1, p2, p3).subscribe(console.log). thanks for the help. also, what language is the first example in? Looks pretty close to haskell
  • user3743222
    user3743222 over 8 years
    it is pseudo code, but yes sintax is inspired from Haskell, at least the type declaration.