Skip to main content
Ben Nadel at cf.Objective() 2013 (Bloomington, MN) with: Jason Dean and Simon Free
Ben Nadel at cf.Objective() 2013 (Bloomington, MN) with: Jason Dean Simon Free

Partial Stream Execution: A Case For Hot RxJS Observables In Angular 2.1.1

By
Published in Comments (5)

CAUTION: I am very new to RxJS and simply trying to find my way in the world. Before you read this article, I should warn you that Ben Lesh (RxJS team lead) has characterized my desire to disconnect the "implementation stream" from the "results stream" as promoting incorrect usage of RxJS streams and a misunderstanding of the core data type. As such, please take this article with a grain of salt; this is just my opinion based on my own personal experience.

In an earlier post about creating leaky abstractions in Angular 2 with RxJS, I added a comment about the "Principle of Least Surprise"; and, while I am not sure that this Principle technically applies here, I was trying to convey that, for many people coming from other libraries like jQuery.ajax() and $http.get(), using a Cold Observable will likely lead to more surprise and astonishment than Hot Observables. This is, in part, why I will personally be using Hot RxJS Observables in my Angular 2 service layer. After I left this comment, it got me further thinking about complex service interactions; and, what would happen if you cancel a subscription during the execution of a chained set of Cold RxJS streams.

Run this demo in my JavaScript Demos project on GitHub.

For the sake of this exploration, imagine that we have a Service class in Angular 2 that needs to fulfill an action by making two AJAX (Asynchronous JavaScript and XML / JSON) requests to the server. However, these two calls cannot be made independently - the second call depends on the result of the first call. In our demo class, we are going to use the .mergeMap() operator to continue one Http stream with another related Http stream:

// Import the core angular services.
import { Http } from "@angular/http";
import { Injectable } from "@angular/core";
import { Observable } from "rxjs/Observable";
import { Response } from "@angular/http";

// Load modules for side-effects.
import "rxjs/add/operator/delay";
import "rxjs/add/operator/map";
import "rxjs/add/operator/mergeMap";

@Injectable()
export class BusinessService {

	private http: Http;


	// I initialize the service.
	constructor( http: Http ) {

		this.http = http;

	}


	// ---
	// PUBLIC METHODS.
	// ---


	// I perform an action that requires several serialized server-side API calls.
	// --
	// CAUTION: Returns a COLD stream.
	public makeItSo() : Observable<string> {

		var stream = this.http
			.get( this.getUrlA() )
			.delay( 3000 ) // Simulate network latency.
			.mergeMap(
				( response: Response ) : Observable<Response> => {

					// The second HTTP request needed to fulfill this action needs to be
					// built using the response of the first action. That's why these
					// requests cannot be run in parallel.
					var id = response.json().id;

					return( this.http.get( this.getUrlB( id ) ) );

				}
			)
			.map(
				( response: Response ) : string => {

					return( response.json().message );

				}
			)
		;

		return( stream );

	}


	// ---
	// PRIVATE METHODS.
	// ---


	// I return the URL for the first HTTP request.
	private getUrlA() : string {

		// NOTE: Using .getTime() because Chrome is being overly aggressive with caching
		// (even when I have the Chrome Dev Tools open).
		return( `./app/business.service.a.json?_=${ new Date().getTime() }` );

	}


	// I return the URL for the second HTTP request.
	private getUrlB( id: number ) : string {

		// NOTE: Using .getTime() because Chrome is being overly aggressive with caching
		// (even when I have the Chrome Dev Tools open).
		return( `./app/business.service.b.json?id=${ id }&_=${ new Date().getTime() }` );

	}

}

Notice that the returned RxJS stream is a Cold stream. Meaning, it won't actually execute the underlying Http request until the calling context subscribes to the stream.

Now, in our Angular 2 root component, we're going to provide two options: one to subscribe to this business service stream; and, one to unsubscribe from this stream:

// Import the core angular services.
import { Component } from "@angular/core";
import { Observable } from "rxjs/Observable";
import { Subscription } from "rxjs/Subscription";

// Import the application components and services.
import { BusinessService } from "./business.service";

@Component({
	selector: "my-app",
	template:
	`
		<p>
			<a (click)="makeRequest()">Make Request</a>
			&nbsp;|&nbsp;
			<a (click)="unsubscribe()">Unsubscribe</a>
		</p>

		<p *ngIf="subscription">
			<em>Subscription to business service obtained...</em>
		</p>
	`
})
export class AppComponent {

	private businessService: BusinessService;
	private subscription: Subscription;


	// I initialize the component.
	constructor( businessService: BusinessService ) {

		this.businessService = businessService;
		this.subscription = null;

	}


	// ---
	// PUBLIC METHODS.
	// ---


	// I make a request to the business service and store the result subscription.
	public makeRequest() : void {

		this.subscription = this.businessService
			.makeItSo()
			.subscribe(
				( response: string ) : void => {

					console.log( "Completed successfully!", response );

				},
				null, // On error.
				() : void => {

					// For the sake of the UI, nullify the subscription once the stream
					// has completed.
					this.subscription = null;

				}
			)
		;

	}


	// I unsubscribe from any pending result subscription.
	public unsubscribe() : void {

		if ( ! this.subscription ) {

			return;

		}

		console.warn( "Unsubscribed from cold stream." );

		this.subscription.unsubscribe();
		this.subscription = null;

	}

}

Because we're using the .delay() RxJS operator to simulate some network latency in our chained Http stream, it gives us the opportunity to interact with the Cold stream while it is mid-execution. But first, let's initiate a request to the Business service and let it run to completion:

Partially executed RxJS streams - a case for 'hot' streams in Angular 2 services.

As you can see, the call to the BusinessService resulted in one stream that initiated two AJAX requests against the server.

Now, let's run the same code; but, this time, we'll unsubscribe from the result in between the first and second AJAX requests, during the simulated network latency:

Partially executed RxJS streams - a case for 'hot' streams in Angular 2 services.

As you can see, when we unsubscribe from the service layer stream while the stream is mid-execution, we can find ourselves in a situation in which the first Http request has been executed but the second Http stream has not (and never will be). The action requested by the service layer has only been partially fulfilled.

To be very clear, I am not saying that this is a bug in any way. This is exactly how Cold streams are supposed to work (as far as I undrestand - again, I'm very new to RxJS). I point this scenario out only to try and make a case for Hot streams in my Angular 2 service layer. Had the BusinessService implemented a Hot stream, the sequence of Http requests would not have been interrupted by the .unsubscribe() call in the root component.

It's hard to talk about "surprise" when something works exactly as it is documented. In this case, the Cold stream isn't working in a surprising way; but, I would argue that the service layer itself - from a consumption standpoint - is working in a surprising way, allowing for the partial execution of commands. I believe that if I use Hot streams in my Angular 2 service layer, it will lead to less surprise and will generally be more in alignment with the type of behavior that my team is used to seeing elsewhere.

Want to use code from this post? Check out the license.

Reader Comments

15,848 Comments

@Edward,

I haven't even touched on Schedulers yet :( I do see in RxJS that there is often an opportunity to define or select a Scheduled in the operations. But, I don't really have any sense of what they do. From a cursory glance at that link, it looks like its yet another layer of complexity on top of the already complex mental model of RxJS :D

It's interesting - over the weekend, I heard on a podcast that some people are floating the concept of a "cancelable Promise" for a future version of JavaScript. If that ever lands, I wonder how many uses of RxJS streams might be replaced with that.

I completely understand that RxJS does *way more* than a cancelable Promise. But, I also know that many of my use cases are quite clearly "one-off request that I wish I could cancel" type actions. Perhaps as I get more comfortable with reactive programming, the very though of a Promise will start to seem more foreign? Not anywhere close to that mindset yet.

15,848 Comments

@All,

For what it's worth, I'm staring to coalesce around Promises again in the "core" of my application:

www.bennadel.com/blog/3202-my-evolving-angular-2-mental-model-promises-and-rxjs-observables.htm

I'll continue to use RxJS Streams in my Controller layer, and in specialized parts of the "query core"; but, the deepest parts of my business logic will be moving towards Promises.

Just continuing to evolve my understanding of all this asynchronous workflow stuff :D

3 Comments

Hi Ben,

You could argue that someone that is used to working with an Observable orientated API would find that an Observable that could not be retried or cancelled (at the XHR level) was a big surprise.

My preference is that if you want those semantics (can't cancel, can't retry), that returning a promise would yield a more intention revealing API.

That would then allow you, as the api designer, to show your intent of supporting cancellation and retry by returning an Observable in those cases.

Christian

15,848 Comments

@Christian,

I totally agree with your sentiment. I've continued to noodle on this and have actually reached a similar mental model. Now, going forward, I intend to use Promises for most of my "service layer" application logic. But, I will expose Streams if and when they make sense for exactly the reason you outline - for people who want to use streams, they should receive the kind that they expect to work with.

This way, my "commands" can't be cancelled half-way through execution. And, my "queries" may be exposed as Streams where the behavior is a value-add (ex, cancelling an AJAX request during a type-ahead interaction).

I believe in love. I believe in compassion. I believe in human rights. I believe that we can afford to give more of these gifts to the world around us because it costs us nothing to be decent and kind and understanding. And, I want you to know that when you land on this site, you are accepted for who you are, no matter how you identify, what truths you live, or whatever kind of goofy shit makes you feel alive! Rock on with your bad self!
Ben Nadel