RxJS Promise Composition (passing data)
I did not read all of it, but if you want to achieve the same as pl().then(p2).then(p3).then(console.log);
, with p
being function returning promises, you could do something like (example here)
Rx.Observable.fromPromise(p1())
.flatMap(function(p1_result){return p2(p1_result);})
.flatMap(function(p2_result){return p3(p2_result);})
Or the more symmetric :
var chainedPromises$ =
Rx.Observable.just()
.flatMap(p1)
.flatMap(p2)
.flatMap(p3);
Now if you want to execute sequentially callback wrapped through fromCallback
or fromNodeCallback
, you could do something like :
function rename (flag){
return flag
? rename(file,dest).flatMap(return Rx.Observable.just(dest))
: Rx.Observable.throw(new Error('File does not exist.'));
}
Rx.Observable.just(file)
.flatMap(exists)
.flatMap(rename)
.flatMap(stat)
The latter code is untested, so keep me updated if that works.
Last comment, this should work if at each point you only have one value produced (like a promise). If you would have several files instead of one, with flatMap
you might get ordering issues (if order matters to you), so in that case, you could use concatMap
as a replacement.
low_ghost
Updated on February 03, 2020Comments
-
low_ghost about 4 years
I'm brand new to Rx and am finding it difficult to find documentation on composing promises such that data from the first promise is passed into the second and so on. Here's three very basic promises, the calculations on the data aren't important, just that something async has to be done using data from the previous promise.
const p1 = () => Promise.resolve(1); const p2 = x => { const val = x + 1; return Promise.resolve(val); }; const p3 = x => { const isEven = x => x % 2 === 0; return Promise.resolve(isEven(x)); };
The traditional way to achieve the composition I'm talking about:
pl().then(p2).then(p3).then(console.log);
My favorite implementation is Ramda's composeP and pipeP:
R.pipeP(p1, p2, p3, console.log)()
It seems likely Rx might be able to handle this kind of situation pretty fluently. However, the closest I've found so far is from the RxJS to async (library) comparison here https://github.com/Reactive-Extensions/RxJS/blob/master/doc/mapping/async/comparing.md:
var Rx = require('rx'), fs = require('fs'), path = require('path'); var file = path.join(__dirname, 'file.txt'), dest = path.join(__dirname, 'file1.txt'), exists = Rx.Observable.fromCallback(fs.exists), rename = Rx.Observable.fromNodeCallback(fs.rename), stat = Rx.Observable.fromNodeCallback(fs.stat); exists(file) .concatMap(function (flag) { return flag ? rename(file, dest) : Rx.Observable.throw(new Error('File does not exist.')); }) .concatMap(function () { return stat(dest); }) .forEach( function (fsStat) { console.log(JSON.stringify(fsStat)); }, function (err) { console.log(err); } );
concatMap seems promising, but the above code looks pretty horrific. I was also having trouble with my example because Rx.Observable.fromPromise(p1) won't work as it expects a promise itself, not a function, and Rx.Observable.defer(p1) doesn't seem to pass parameters like the example.
Thanks!
Similar question but without data passing: Chaining promises with RxJS
-
low_ghost over 8 yearsI was somewhat hoping for just a slightly higher abstraction that would be something like flatMapAll(p1, p2, p3). Particularly helpful if a sequence of promises is generated via a map e.g. const ps = map((x) => promisedFsReadFileCurriedSoThatItDoesSomethingWithPreviousFileData(x), ['1.txt','2.txt','3.txt']); Rx.Observable.just().flatMapAll(...ps); (just pseudo code). But this is definitely a manageable solution and likely there's a way to do this with mapping over fromPromise or something. thanks!
-
low_ghost over 8 yearsalso didn't test the second code sample either, but the first ones work like a charm
-
user3743222 over 8 yearsyou can do
flatMapAll
yourself.flatMapAll :: Rx.Observable -> [a ->a] -> Rx.Observable
.flatMapAll = (source, fn_array) -> fn_array.reduce((acc, fn) -> acc.flatMap(fn), source)
. In js,Rx.Observable.prototype.flatMapAll = function (fn_array) {source = this; return ...}
-
low_ghost over 8 yearsyep, was coming to something similar. here's what I wrote
const flatMapAll = (...fns) => fns.reduce((acc, fn) => acc.flatMap(fn), Rx.Observable.just()); flatMapAll(p1, p2, p3).subscribe(console.log)
. thanks for the help. also, what language is the first example in? Looks pretty close to haskell -
user3743222 over 8 yearsit is pseudo code, but yes sintax is inspired from Haskell, at least the type declaration.