Performing The Stream Equivalent Of Promise.all() Using RxJS
When I first learned Promises, I thought they were hard to understand. Then, when I started looking into Angular 2, I had to learn about RxJS and Observable sequences of data. And, to be honest, RxJS kind of makes Promises look like child's play. Even after almost a year of Angular 2 research, I'm barely capable with RxJS streams. In fact, I've even been told on good authority that I'm downright wrong in some of my thinking. Just this morning, while showering, I realized that I didn't even know how to perform the RxJS equivalent of Promise.all(). So, this morning, I tried to sit down and figure it out.
Run this demo in my JavaScript Demos project on GitHub.
With Promises, there are only a few core concepts. With RxJS, there are many core concepts that power a vast number of core operators, which can be forever expanded upon by user land. So, while there's only one Promise.all(), there seems to be several RxJS operators that can achieve the same thing depending on the type of streams being used.
The RxJS operator that appears to be the most feature-compatible with Promise.all() is .forkJoin(). The .forkJoin() operator runs a collection of sequences of Promises in parallel, waits for them all to complete, and then emits a value composing the last emitted value from each stream.
But, depending on the number of values that you expect from a given set of streams, the .combineLatest() and .zip() operators may produce the same result. The .combineLatest() operator will emit a composite value of each stream's latest value; and, it will emit a new value any time each of the underlying streams emits a value. The .zip() operator is similar to the .combineLatest() operator; but, it appears to buffer the results of the underlying streams, only zipping-together values at the same emit-index.
To experiment with these RxJS operators, I tried to aggregate the results of four different asynchronous operations. Three of them are Promises and one of them is a range stream. I used a range stream to see how these various operators behaved when multiple values were emitted.
I also run the tests twice, once with a range-stream delay and once without. Unlike Promises, which are always asynchronous, RxJS streams are inconsistently asynchronous. As such, I wanted to see what happened when the range-stream completed both before and after the promises.
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>
Performing The Stream Equivalent Of Promise.all() Using RxJS
</title>
<script type="text/javascript" src="../../vendor/angular2/2.1.1/node_modules/core-js/client/shim.min.js"></script>
<script type="text/javascript" src="../../vendor/angular2/2.1.1/node_modules/rxjs/bundles/Rx.min.js"></script>
</head>
<body>
<h1>
Performing The Stream Equivalent Of Promise.all() Using RxJS
</h1>
<script type="text/javascript">
// Run first test with a synchronous range stream.
setTimeout(
function() {
console.group( "With No Range Delay" );
runTest( false );
setTimeout( console.groupEnd, 10 );
}
);
// Run second test with an asynchronous range stream - this way, we can see what
// happens when the promises resolve before the stream.
setTimeout(
function() {
console.group( "With Range Delay (so that Promises resolve before range)" );
runTest( true );
setTimeout( console.groupEnd, 50 );
},
500
);
// --------------------------------------------------------------------------- //
// --------------------------------------------------------------------------- //
// --------------------------------------------------------------------------- //
// --------------------------------------------------------------------------- //
function runTest( useDelay ) {
var one = Promise.resolve( "One" );
var two = Promise.resolve( "Two" );
var three = Promise.resolve( "Three" );
// As the fourth item to "collect", I'm going to use a range stream that
// has an optional delay. Since RxJS streams are inconsistently asynchronous,
// the optional .delay() will allow the Promises to resolve before the stream
// in certain tests.
var fourAsRangeStream = useDelay
? Rx.Observable.range( 1, 4 ).delay( 10 )
: Rx.Observable.range( 1, 4 )
;
// ----------------------------------------------------------------------- //
// ----------------------------------------------------------------------- //
// This is our "control" group - this is what we want to do with Streams.
Promise
.all( [ one, two, three, fourAsRangeStream.toPromise() ] )
.then(
function handleResolve( values ) {
console.log( "Promise.all() Result:", values );
}
)
;
// ----------------------------------------------------------------------- //
// ----------------------------------------------------------------------- //
// .combineLatest() runs all of the streams / promises in parallel, waits
// for all of them to emit a value, and then emits an array with the latest
// value from each stream. This does not wait for the streams to complete;
// once each of the streams has emitted a value, .combineLatest() will emit
// a new value when any of the other streams emits a new value.
// --
// NOTE: If all of the streams emit a single value, this will behave
// similar to the .forkJoin() operator.
Rx.Observable
.combineLatest(
Rx.Observable.fromPromise( one ),
Rx.Observable.fromPromise( two ),
Rx.Observable.fromPromise( three ),
fourAsRangeStream
)
.subscribe(
function handleValue( values ) {
console.log( "Rx.Observable.combineLatest( ...streams ) Result:", values );
}
)
;
Rx.Observable
.combineLatest( one, two, three, fourAsRangeStream )
.subscribe(
function handleValue( values ) {
console.log( "Rx.Observable.combineLatest( ...promises ) Result:", values );
}
)
;
// ----------------------------------------------------------------------- //
// ----------------------------------------------------------------------- //
// .forkJoin() runs all of the streams / promises in parallel, waits for
// them all to COMPLETE, and then emits an array that contains the last
// value from each of the parallel streams.
Rx.Observable
.forkJoin(
Rx.Observable.fromPromise( one ),
Rx.Observable.fromPromise( two ),
Rx.Observable.fromPromise( three ),
fourAsRangeStream
)
.subscribe(
function handleValue( values ) {
console.log( "Rx.Observable.forkJoin( ...streams ) Result:", values );
}
)
;
Rx.Observable
.forkJoin( one, two, three, fourAsRangeStream )
.subscribe(
function handleValue( values ) {
console.log( "Rx.Observable.forkJoin( ...promises ) Result:", values );
}
)
;
// ----------------------------------------------------------------------- //
// ----------------------------------------------------------------------- //
// .zip() runs all of the promises / streams in parallel, but only emits a
// list of values when each stream has emitted a value at the same index.
// In this case, since the Promises will only emit one value, only the
// first index of the range stream will be taken.
// --
// NOTE: If all of the streams emit a single value, this will behave
// similar to the .forkJoin() operator.
Rx.Observable
.zip(
Rx.Observable.fromPromise( one ),
Rx.Observable.fromPromise( two ),
Rx.Observable.fromPromise( three ),
fourAsRangeStream
)
.subscribe(
function handleValue( values ) {
console.log( "Rx.Observable.zip( ...streams ) Result:", values );
}
)
;
Rx.Observable
.zip( one, two, three, fourAsRangeStream )
.subscribe(
function handleValue( values ) {
console.log( "Rx.Observable.zip( ...promises ) Result:", values );
}
)
;
};
</script>
</body>
</html>
In each case, I use the same operator with both raw Promises and with proxied streams. This was done more for my own learning - to see which operators can work with either Promises or streams.
When we run the above code, we get the following output:
As you can see, the .forkJoin() operator was the only operator that consistently produced the same value as the Promise.all() control group. That said, the .combineLatest() operator also produced the same result in some circumstances. And, in fact, if all streams in question only produce one value, even more of the operators produce the same result.
Imagine that this is an aggregation of HTTP requests that will only produce one value - let's look at what happens when we replace:
Rx.Observable.range( 1, 4 )
... with:
Rx.Observable.of( 4 )
In this case, each of the four streams will only emit a single value. And, when we run this version of the code, we get the following output:
Now, things are very different. In this case, when each underlying stream only emits a single value, the .forkJoin(), .combineLatest(), and .zip() operators all produce the same result.
So, which operator should you use if you want to perform the RxJS equivalent of Promise.all()? The answer isn't clear. You may look at these results and say that .forkJoin() is the "right" one because it is the most consistent. But, is it really? Meaning, in a Promise.all() aggregation, each Promise can only ever produce one value. So, if we're looking for an RxJS equivalent, it only makes sense that we deal with streams that will only ever produce one value (such as an Http stream). And, in that case, I don't see any real difference in intent between the three operators that all produce the same result.
Promises are hard. RxJS is moar harder and then some. And, as it turns out, there's usually a number of seemingly "right" ways to accomplish the same thing. In the case of creating an RxJS equivalent to Promise.all(), it seems that there are three operators - .forkJoin(), .combineLatest(), and .zip() - that will all produce the desired result.
Of course, as I said above, I'm a total novice when it comes to RxJS, so I may be missing some fundamental difference between the intent of the various operators. Take caution!
Want to use code from this post? Check out the license.
Reader Comments
Nice post, as always Ben!
A thing that I could argue is that, if you put `.last()` on the Observables that emit more than one value, you get the same result with all three operators that you showed.
`Observable.range(1, 4).last()` could be understood "semantically equals" to `Observable.of(4)`, ignoring the Observable allocations and memory usage.
Hey Ben, I am new to your blog, but I am really enjoying it. One question; do you have a "Ben's blog classics list" of your favorite or critical post to focus on?
Thanks
Shortcut
You could also argue that an "equivalent" would also produce all intermediate values: Rx.Observable.merge(one$, two$, three$).finally(allHandler) :)
@Geovani, @Vincent,
It's definitely hard to talk about a true "equivalence" between Promises and Observables because, by their very nature, Promises can only produce one value. As such, these are fundamentally different data-types. So, you could add .last() - which is a good tip - but then I think end of discarding the first set of emitted values.
I'm having a bit of crisis-of-faith with Observables. I keep having an urge to just go back to Promises for most things; then, just convert to an Obsrevable in cases where that makes sense for the type of workflow that I need to implement. Promises just seem like a much more simple, more predictable mental model.
@Shortcut,
Thank you very much for the kind words! I am not sure that I have a set of "classics" so to speak. I'm kind of a stream-of-conscious kind of writer, so I just write about whatever I happen to be working on.
The best I can offer is just my posts broken down for JavaScript, if that's what you're interested in:
www.bennadel.com/blog/tags/6-javascript-dhtml-blog-entries.htm
Thanks for stopping by!
@Ben
I'd the same feelings about it, but after learning about Cycle.js (and most of Andre Staltz' content) I have no doubt that Observables (the concept, at least) are the future.
If you have some time on the future, take a look at the introduction about Cycle.js to see a Observable-first approach to build UI's (it's a bit outdated, but still relevant): https://egghead.io/courses/cycle-js-fundamentals
And the official site: https://cycle.js.org/
I would really like to see you thoughts about it in the future :)
Hi Ben,
Nice Article,
I am working on an Angular 5 Typescript project with rxjs 5 I have multiple observable calls in my component and I need to make sure that the loader should only be disabled if all the API calls are being subscribed, I feel that, the solution you mentioned, is perfect fit for me. Can you please help me out to utilise ForkJoin with mealtime Observable http call. Do you have any example which really uses Angular http calls with observable returns like Observable any
I highly appreciate your help.
Thanks and regards
Murtuza
@Murtuza,
All you have to do is pass the HTTP observables into the
.forkJoin()
method. This will create a subsequent stream that you can subscribe to. Once you subscribe, it should trigger the HTTP requests. And, when those all complete, your.subscribe()
callback should receive the values of each HTTP call (as indices in an array). I don't have an example of this off-hand. I actually lean more on Promises for stuff like this. I tend to use Streams in cases where I am expecting more than one result over time ... though, that's just a personal preference.