Experimenting With The .catch() Operator And Stream Continuation In RxJS And Angular 2
Earlier this week, when I was experimenting with the RxJS Subject class in Angular 2 (in lieu of EventEmitter), I realized that my observable sequence had a problem: if it ever encountered an error, it would stop working, even if I was catching errors. But, since I happend to be working with client-side error logging, I wanted my sequence to continue working indefinitely (logging errors throughout the lifetime of the application). As such, I started to play around with the .catch() operator and the concept of stream continuation in RxJS.
Run this demo in my JavaScript Demos project on GitHub.
Since the .catch() operator in RxJS looks like the .catch() method or error handler in a Promise chain, it's easy to think that they are doing the same thing. And, in a single-value sequence, they basically are doing the same thing from an developer experience standpoint. But, if you have a stream that needs to emit multiple values, you will quickly find out that the .catch() operator in an RxJS control flow is very different then the .catch() method in a Promise control flow.
In a Promise chain, the .catch() method is a value transformer. Meaning, it takes in a value and returns a value. In RxJS, the .catch() operator is a stream transformer. Meaning, it takes in a value and returns a new stream reference (that contains other values).
Given this RxJS behavior, I wanted to experiment with using the .catch() operator to foster stream continuation. And, after some trial and error, I discovered that if my .catch() operator returned a reference to the original stream, it would keep the stream going. Well, sort of. Behind the scenes, it's actually unsubscribing and resubscribing to the stream. But, this subscription mechanism only becomes symptomatic with "cold" streams and "refCount" streams; with "hot" streams, the values just pick up where they left off.
To demonstrate, I put together a little RxJS demo in which we can create streams from four different sources. Each of these streams is then flat-mapped into a (simulated) HTTP request that will fail after every few attempts. In my stream, I'm catching those errors and then returning a reference to the original stream:
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>
Experimenting With The .catch() Operator And Stream Continuation In RxJS And Angular 2
</title>
<link rel="stylesheet" type="text/css" href="./demo.css"></link>
</head>
<body>
<h1>
Experimenting With The .catch() Operator And Stream Continuation In RxJS And Angular 2
</h1>
<p>
<a href="./?1">Stream 1</a>: Hot, interval, custom.<br />
<a href="./?2">Stream 2</a>: Hot, interval, publish.<br />
<a href="./?3">Stream 3</a>: Hot, interval, refCount.<br />
<a href="./?4">Stream 4</a>: Cold, interval.<br />
</p>
<!-- Load demo scripts. -->
<script type="text/javascript" src="../../vendor/angularjs-2-beta/8/es6-shim.min.js"></script>
<script type="text/javascript" src="../../vendor/angularjs-2-beta/8/Rx.umd.min.js"></script>
<script type="text/javascript">
// --------------------------------------------------------------------------- //
// NOTE: This demo really has nothing to do with Angular 2. However, I am
// learning RxJS in the context of learning Angular 2 (currently in Beta 8),
// so I think mentioning it is goo framing for my mindset and use-cases.
// --------------------------------------------------------------------------- //
var which = ( +location.search.slice( -1 ) || 1 );
console.info( "Running with stream:", which );
// When coming from the world of Promise chains, it can be very confusing to
// think about how errors are handled in RxJS streams. Where as in Promise
// chains, you can "transform" an error (ie, a rejection) into a resolution
// if you want to, the same cannot be said exactly of RxJS streams. In RxJS,
// an error can be transformed at the STREAM LEVEL but not necessarily at the
// VALUE LEVEL. As such, we're going to experiment with how .catch() interacts
// with different source streams.
switch ( which ) {
// In this case, we're going to manually create a HOT stream that will
// keep emitting values every second until the end of time.
case 1:
var eventSubject = new Rx.Subject();
var source = Rx.Observable.from( eventSubject );
var i = 0;
setInterval(
function triggerEvent() {
eventSubject.next( i++ );
},
1000
);
break;
// In this case, we're going to implicitly create a HOT stream through the
// .publish() operator.
case 2:
var source = Rx.Observable.interval( 1000 )
.publish()
;
source.connect(); // Drop it like it's hot!
break;
// In this case, we're going to implicit create a HOT stream; but, this time,
// we're not necessarily going to let it run forever - we're using the
// .refCount() operator to ensure that the underlying source is disconnected
// when the published source loses all of its subscriptions.
case 3:
var source = Rx.Observable.interval( 1000 )
.publish()
.refCount()
;
// Uncomment this to demonstrate how .refCount() will keep HOT stream
// open after .catch() fires IF there is an active subscriber.
// --
// source.subscribe( function noop() {} );
break;
// In this case, we're going to create a COLD stream that will emit a value
// every second to each one if its subscribers. As a COLD stream, each
// subscriber will get its own unique set of values, starting with 0.
case 4:
var source = Rx.Observable.interval( 1000 );
break;
}
// --------------------------------------------------------------------------- //
// --------------------------------------------------------------------------- //
// Using the defined source, let's configure our observable stream transformation.
var stream = source
.do(
function logStart( value ) {
console.log( "- - - - - - - - - -" );
console.log( "Value:", value );
}
)
.map(
function transformValue( value ) {
return( value + 0.1 );
}
)
// CAUTION: The flatMap() operator will occasionally throw an error due to
// the MOD logic we are using internally.
.flatMap( sendToServer )
// When we catch the error thrown by the .flatMap() operator (or any uncaught
// error higher up in the stream), we need to respond by returning another
// STREAM that the subscriber will switch over to.
.catch(
function handleError( error ) {
console.warn( "Error caught, re-routing back to source stream." );
// In this case, when the stream fails, we're going to catch that
// failure and re-route the subscription back to the SAME STREAM. The
// behavior of this re-route depends heavily on the nature of the
// stream and whether or not it is HOT or COLD.
// --
// NOTE: Despite the fact that we are re-routing to the same stream,
// the observer is still being unsubscribed from the stream and then
// "re" subscribed to it. You can see this very clearly with COLD
// streams as they "restart" upon catch.
return( stream );
}
)
// NOTE: As it turns out, the .retry() operator, when the retry-count is
// omitted, does the same thing - it just re-subscribes to the source
// stream an indefinite number of times.
// --
// .retry()
.finally(
function handleFinally() {
console.warn( "Stream finished." );
}
)
;
// Subscribe to the stream.
var subscription = stream.subscribe(
function handleValue( value ) {
console.log( "Subscribe:", value );
},
function handleError( error ) {
console.error( "Subscribe:", error );
}
);
// --------------------------------------------------------------------------- //
// --------------------------------------------------------------------------- //
// I simulate sending data to the server. And, I simulate network FAILURE every
// fourth (or so) invocation. Returns a stream in either case.
function sendToServer( value ) {
// Simulate successful send.
if ( ( value < 1 ) || ( Math.floor( value ) % 4 ) ) {
console.log( "Sent to server:", value );
return( Rx.Observable.of( value ) );
// Simulate failed send.
} else {
console.warn( "HTTP error." );
return( Rx.Observable.throw( new Error( "Network Error" ) ) );
}
}
</script>
</body>
</html>
As you can see, I'm creating a "stream" reference. Then, within the .catch() operator, I'm returning that same "stream" reference in an attempt to continue observing the stream (from our subscriber's perspective).
With Stream 1, we're basically building a custom "hot" stream that will continue to emit values until the interval is terminated. And, when we run the demo with this stream, we get the following output:
As you can see, the stream sequence, from the subscriber's point of view, simply picked up where it left off.
With Stream 2, we are using the .intervla(), .publish(), and .connect() operators to create a "hot" stream. And, when we run the demo with this stream, we get the following output:
As you can see, the stream sequence, from the subscriber's point of view, simply picked up where it left off.
With Stream 3, we are using the .interval() and .publish() operators to create a "hot" stream. But, this time, rather than just connecting the published stream to the underlying source, we're using .refCount(). The .refCount() operator will disconnect the hot stream from the underlying source when the last subscriber unsubscribes from the published source. And, when we run the demo with this stream, we get the following output:
As you can see, the stream sequence is restarted. Thanks to the .refCount(), the "hot" stream is disconnected when the error handler unsubscribes from the source stream internally. Then, when the .catch() operator re-routes (for lack of a better term) back to the "hot" stream, a new subscription is created and the hot stream is restarted.
With Stream 4, we are using the .interval() operator to create a "cold" stream, which means that each subscriber gets its own unique set of values. And, when we run the demo with this stream, we get the following output:
As you can see, the stream sequence is restarted. Since we are using a "cold" stream, each new subscriber gets its own distinct values. And, since our error handling is essentially unsubscribing from the source stream and then resubscribing to the source stream (returned from the .catch() operator), it starts receiving a new, distinct set of values.
In all cases, however, the stream appears to be continued from the perspective of our explicit stream subscription. Values continue to be passed down from the source stream to our value handler, even after errors have occurred. So, in that sense, it's doing exactly what I wanted it to do. The only real caveat is that the values are expressed differently depending on the type of source stream we've subscribed to.
Or, you can just use the .retry() operator.
As it turns out, all of this can be quite easily accomplished with the .retry() operator. In fact, if you run this demo and replace the .catch() operator with the .retry() operator (omitting any retry count), it works exactly the same.
So why didn't I just try to use .retry() in the first place? Well, this is where I stumble when trying to project my understanding of Promises onto an RxJS workflow. Remember that the .catch() operator in a Promise chain is a value transformer. Meaning, it doesn't affect upstream aspects of the Promise chain, just the downstream recipients. As such, you would never think of this as a "retry" operation in a Promise - it's just a value transformation.
In an RxJS context, when trying to do the same thing, I didn't want to "retry" the stream - I wanted to just "continue" the stream. So, in my mind, that's what the .catch() operator was for - transforming the error into a value. Only, RxJS doesn't do value transformation with errors, it does stream transformation. Which is what .retry() is doing.
At first, you may think that this entire post was a complete waste of time. But, I think it was actually a very valuable step in helping me understand how RxJS streams work and how errors, within a stream, are handled. Not to mention a better understanding of "hot" and "cold" streams and how they interplay with operators like .catch() and .retry(). RxJS is most certainly a beast; but, I'm starting to get it a little bit at a time.
Want to use code from this post? Check out the license.
Reader Comments
Fascinating (to me). Among other things, I think you are saying that there is no difference between `.catch(()=>stream)` and `.retry()`. Is that correct.
Of course you can do different things with the two methods. The catch affords error interpretation and stream manipulation; the retry can set a finite limit on retries before giving up.
@Ward,
Right, for that specific use-case, there was no difference. But, I totally agree that there is actually a difference, as you point out, in that each of the operators allows for different kinds of use-cases.
With RxJS streams, I'm finding that some of the biggest hurdles are just wrapping my heard around the terminology. Like, in this case, the concept of "retrying" seems so odd. Especially with a "hot" stream. The stream never stops - there's nothing to "retry" (from a conceptual viewpoint). Really, I just wanted to "ignore errors" or something.
But, now that raises an interesting question about RxJS - why is it NOT an injectable? Why should I just assume a static implementation of it? Imagine if I wanted to add operators to Observable? Wouldn't it be better to sub-class it (somehow) and then inject that in my provider chain?
For example, adding an ".ignoreErrors()" operator which was really just an alias for ".retry()". It would be great to have a handle on an instance of RxJS that only *I* was using and that I didn't have to worry about creating side-effects for everyone else in the application.
Hmmmm, giving me something to ponder now.
Similar problem. I'm using switchMap, and at first thought I could handle errors from the projected observable using catch(), but of course it turns out that an error terminates the entire chain, and of course that isn't what I want. What I really want is to handle errors when they happen, but keep the chain in place for upcoming events.
Using retry() (or catch() in the way you're using it above) makes my code run in an infinite loop (as if it were a cold observable). I must be missing something.
I'm able to work around the problem for now by mapping successes and errors into a "catch-all wrapper class" (containing both result and error, one set and the other null) and passing *that* out of switchMap, but it's a clunky approach and not a pattern I want to use much. I had hoped there would be a simpler way.
This post is more than a year old, but I'm not finding a lot out there about how to handle this situation. I would have expected it to be more common.
...aaaand if I use a Subject instead of a ReplaySubject, catch works as expected.
So your approach works great after all. Thanks!
@Rich,
Glad to hear you got it working. I haven't looked much into the different Subject types. And to be honest, I haven't done much with RxJS streams lately. I've moved back over to Promises a bit more (not to say that Streams don't have their place). But, I do need to dig in a bit more.
Since I wrote that I've done more reading (Dave Sexton's blog). Using publish fixed the infinite loop problem.
These things are powerful and interesting, but they do have their place. The learning curve is significant.
@Rich,
> The learning curve is significant.
.... amen!! Plus, I think it really requires a different mindset. It's one thing to think about streams; but, it's entirely other thing to see a solution through the aggregation of different streams. The one that always blows my mind, no matter how many times I see it explained is the whole drag-n-drop thing where its basically three different streams (mousedown, mousemove, and mouseup) that involve take-until operators and what not. Even though I know what it's supposed to be doing, it's still really hard to "see" it when people demo it.
So This is a pretty cool post, but if you're worried about interrupting the primary stream there's a much easier way to handle these scenarios.
You can literally just do
... all the code you had above ...
.flatMap( (value) => this.sendToServer(value).catch( (error) => {
console.log( {Error: error} ); //or other error handling code here
return Observable.empty();
}) );
^^this does the trick where you can catch the error without discontinuing the main observable. Since you're returning an observable from the sendToServer function, you've created the secondary observable stream to prevent discontinuation of the primary stream. There's honestly no need to go through these gymnastics of reconnecting the primary stream or doing funky magic to prevent it to keep it hot.
This is honestly the easiest way to handle the problem and is equally effective.
Thanks a lot of writing this up. Definitely not a waste of time!
Such an essential discussion for me ) . Thanks for that, guys.