Skip to main content
Ben Nadel at the New York ColdFusion User Group (Jul. 2009) with: Sean Schroeder and Matt Levine
Ben Nadel at the New York ColdFusion User Group (Jul. 2009) with: Sean Schroeder Matt Levine

Using Hot RxJS Observables In Your Service Layer In Angular 2.1.1

By
Published in Comments (12)

CAUTION: I am very new to RxJS and simply trying to find my way in the world. Before you read this article, I should warn you that Ben Lesh (RxJS team lead) has characterized my desire to disconnect the "implementation stream" from the "results stream" as promoting incorrect usage of RxJS streams and a misunderstanding of the core data type. As such, please take this article with a grain of salt; this is just my opinion based on my own personal experience.

In my recent experimentation with Angular 2 and RxJS, I've come to the conclusion that I was creating "leaky abstractions" with RxJS. This means that I was unintentionally allowing implementation details to leak out of my service layer and into the calling context. In my previous post, I fixed this problem by using an intermediary Promise; however, I think a more elegant, more canonical fix would be to use "Hot" RxJS observables in my Angular 2 service layer. Specifically, I think the .publishLast() operator makes a lot of sense.

Run this demo in my JavaScript Demos project on GitHub.

To be clear, from a calling context standpoint, there is absolutely no difference between this blog post and my previous two posts. The only thing that I'm changing here is an internal implementation detail that should be encapsulated within my service layer. As such, there is nothing fundamentally different about this exploration when compared to my previous explorations.

That said, I do have one additional thought experiment to present - one more reason that I believe there should be a disconnect between your service layer's "implementation stream" and the "result stream" that it returns: premature cancelation.

Imagine that you have a routable component that allows the user to invoke service layer methods that result in AJAX (Asynchronous JavaScript and XML / JSON) requests being made to the server. Now, imagine that this component also loads a lot of images. In fact, it loads enough images in the background to fully exhaust the HTTP request pool to your website's domain (most browsers only allow about 6 concurrent HTTP requests to a single domain). This means that, at the time the user initiates an "action", there are no available HTTP requests and the outgoing AJAX request gets pushed onto the pending queue of requests.

Now, while the AJAX request is still on the HTTP queue, imagine that the user routes away from the current component, causing it to be destroyed. In the ngOnDestroy() life-cycle method, we're likely going to unsubscribe from any RxJS streams. So, the question - the point of this thought experiment - is should routing away from the component cause the underlying and currently-queued AJAX request to be aborted?

In my mind, the answer is obviously "No." I would not want to prematurely cancel the AJAX request simply because I am routing away from the user interface (UI) that initiated the action. Of course, you may disagree - this is just my opinion based on how I would want and expect a service layer to function (in most cases).

Thought experiment aside, I think an elegant way to create this disconnect between the underlying stream (ex, Http) and the service layer's result stream is to create a Hot RxJS stream with the .publishLast() operator. This operator creates a shared stream that proxies the underlying Http stream, allowing multiple subscriptions to be made without re-initiating the underlying stream. Then, the "last" portion of the .publishLast() operator will wait until the underlying stream to complete before it emits the last value to all of its subscribers.

This proxy to the underlying stream will allow calling contexts to unsubscribe without prematurely canceling the underlying stream. Going back to the thought experiment, this means that the routable component can safely unsubscribe from service layer results in its ngOnDestroy() life-cycle event method without "accidentally" aborting out of the underlying AJAX request.

Before we look at the code, I just want to stress again that, from an experiential standpoint, the hot .publishLast() stream is not meaningfully different from using an intermediary promise:

Observable.fromPromise( stream.toPromise() )

Both approaches create a hot stream (so to speak). Both approaches only emit one value. Both approaches will emit that last value to subscribers who subscribe after the underlying stream has already completed. Both approaches create a disconnect between the underlying stream and the calling context. So, while I think the .publishLast() is a bit more "elegant" and "stream oriented," I would argue that the difference is a mere implementation detail and not a point of significance.

That said, let's look at some code. Here is a simple FriendService class that provides a .getFriends() method. This method initiates an AJAX request and then proxies it with the .publishLast() operator:

// Import the core angular services.
import { Http } from "@angular/http";
import { Injectable } from "@angular/core";
import { Observable } from "rxjs/Observable";
import { Response } from "@angular/http";

// Load modules for side-effects.
import "rxjs/add/operator/map";
import "rxjs/add/operator/publishLast";

export interface IFriend {
	id: number;
	name: string;
}

@Injectable()
export class FriendService {

	private http: Http;


	// I initialize the service.
	constructor( http: Http ) {

		this.http = http;

	}


	// ---
	// PUBLIC METHODS.
	// ---


	// I return an observable collection of Friends.
	public getFriends() : Observable<IFriend[]> {

		this.trackMethod( "getFriends" );

		// NOTE: Using .getTime() because Chrome is being overly aggressive with caching
		// (even when I have the Chrome Dev Tools open).
		var stream = this.http
			.get( "./app/friend.service.json?_=" + new Date().getTime() )
			.map(
				( response: Response ) : IFriend[] => {

					return( response.json() );

				}
			)
			// Create a stream that waits for the underlying (Http) stream to finish
			// (in either completion or in error) before it emits the last value received
			// by the underlying stream. The "publish" portion allows multiple subscribers
			// to connect to the Http stream without re-sending it across the wire; and,
			// the "last" portion allows the emitted result to be provided to
			// subscriptions that get created AFTER the underlying stream has completed.
			// --
			// NOTE: At this point, the underlying (Http) stream cannot be canceled by
			// the calling context -- which I BELIEVE is what you want in the majority
			// of "service layer" methods.
			.publishLast()
		;

		// Connect the "publishLast" stream to the underlying Http stream, creating a
		// "hot" stream that will start to fire immediately.
		stream.connect();

		return( stream );

	}


	// ---
	// PRIVATE METHODS.
	// ---


	// I track method calls for subsequent analytics and analysis.
	private trackMethod( methodName: string ) : void {

		console.warn( "Tracking method:", methodName );

	}

}

In this case, the .publishLast() creates a shared, connectable stream on top of the underlying Http request. Then, calling .connect() connects the shared stream to the underlying Http stream, initiating the actual AJAX request.

In the root component, we then use this service layer to gather friend data in two ways. In one way, we get the resultant stream and pipe it into the tempalte (via Async Pipe). And, in another way, we simply "fire and forget" the service layer call. The point of the latter approach is to demonstrate that we don't need to subscribe to the resultant stream in order for the underlying AJAX request to be initiated.

// Import the core angular services.
import { Component } from "@angular/core";
import { Observable } from "rxjs/Observable";
import { OnInit } from "@angular/core";

// Import the application components and services.
import { FriendService } from "./friend.service";
import { IFriend } from "./friend.service";

@Component({
	selector: "my-app",

	// CAUTION: Notice that we are using the ASYNC pipe THREE times in this template.
	// --
	// NOTE: I don't recommend this approach - this is just an exploration of the way
	// Async Pipe interacts with streams returned from the "service layer".
	template:
	`
		<p>
			<a (click)="loadData()">Reload Data</a>
			&nbsp;|&nbsp;
			<a (click)="fireAndForget()">Fire and Forget</a>
		</p>

		<p *ngIf="( ( friends | async ) === null )">
			<em>Loading....</em>
		</p>

		<ul *ngIf="( ( friends | async ) !== null )">
			<li *ngFor="let friend of friends | async">
				{{ friend.name }}
			</li>
		</ul>
	`
})
export class AppComponent implements OnInit {

	public friends: Observable<IFriend[]>;

	private friendService: FriendService;


	// I initialize the component.
	constructor( friendService: FriendService ) {

		this.friendService = friendService;
		this.friends = null;

	}


	// ---
	// PUBLIC METHODS.
	// ---


	// I make a request to get friend data; BUT, don't subscribe to the resultant stream.
	public fireAndForget() : void {

		this.friendService.getFriends();

	}


	// I make a request to get friend data.
	public loadData() : void {

		// Get the friend stream.
		// --
		// NOTE: Instead of extracting the "friends" collection from the stream, we are
		// going to be using the Async Pipe in the template to automatically bind to the
		// stream response.
		this.friends = this.friendService.getFriends();

		// In addition to using Async Pipe, we'll also imperatively subscribe to the
		// stream so that we can monitor the results (and errors).
		this.friends.subscribe(
			( value: IFriend[] ) : void => {

				console.log( "Service Success!", value );

			},
			( error: any ) : void => {

				console.log( "Service Error!", error );

			}
		);

	}


	// I get called once after the component has been instantiated and the inputs have
	// been bound for the first time.
	public ngOnInit() : void {

		this.loadData();

	}

}

When we run this app, we can see a few things. First, no matter how many times we bind the resultant stream to the template (using Async Pipe), we only initiate one Http request. This is because all of the Async Pipe subscriptions share the same .publishLast() proxy stream. We can also see that our "fire and forget" method sends an AJAX request despite having no subscribers beyond the service layer boundary.

Using Hot RxJS streams in your service layer in Angular 2.

In the last three posts (inclusive), I've been trying to make the case that the "implementation streams" in an Angular 2 service layer should (with some meaningful exceptions) be disconnected from the "result streams" that they return. In the previous posts, I did this with an intermediary Promise. In this post, I did it with what I think may be a more canonical RxJS approach. However, the difference is just a minor detail and provides the same experience to the calling context.

Want to use code from this post? Check out the license.

Reader Comments

15,848 Comments

@All,

A quick follow-up post on another argument *for* Hot streams in a service layer - it helps protect against accidental "partial execution" of a service layer action:

www.bennadel.com/blog/3187-partial-stream-execution-a-case-for-hot-rxjs-observables-in-angular-2-1-1.htm

To me, this really speaks to the consumer mental model about how a "service layer" is supposed to work. It focuses on "surprise" and less on the technical underpinnings of RxJS streams.

26 Comments

@Ben,

I didn't read all your follow-up posts (it's quite a lot; I can imagine that being a hurdle for Ben Lesh as well). However, I do feel that the way you're thinking about Observables leads to the confusion.

If I understood correctly, you have a service that should post an HTTP call; you don't care about the response; and you want to be able to cancel the call if it's become redundant.

The problem is that you should view an Observable as "potential values that could come in at a later time". These values, in this case, are the HTTP response.

So what's problematic, in this case, is that you don't actually care about the response, and that you're viewing Observables as "an action that I'm able to cancel". I would argue that an Observable is not the correct representation of that.

Thus, what I would do is, I think, not expose the Observable outside of the service at all. The caller of the service should not care about the response, so you shouldn't return a representation of that response at all. In other words, just subscribe to the Observable within the service, and at most return a cancellation callback to the calling code.

Coincidentally I'm working on a blog post right now that should help thinking about Observables. I'll probably be able to finish it somewhere today, so you might want to checkout my website tomorrow.

15,848 Comments

@Vincent,

I appreciate the feedback. But, I don't think that our viewpoints are too different (I have not read your post yet). But, when you talk about an Observable as:

> ... potential values that could come in at a later time

I am also looking at it that way. I think the big difference however is that I am drawing a hard line between the concept of "response values" with "actions". Meaning, a service layer plays TWO roles:

* Fulfill actions.
* Return responses.

In my mind, the fulfillment of the action and however that is done is completely separate from the way in which it returns responses. That makes the fulfillment itself an "implementation detail" that has no bearing on the way responses are delivered.

As far as not caring about the response, I am only pointing that out as possible way for a service layer to be consumed. I'm trying to illustrate the point that the fulfillment of the action should not be coupled the way the responses are consumed. Meaning, if the calling context decided not to consume the responses - which are "potential values that could come in at a later time" - then the Service layer shouldn't care one or the other.

My demo was less about canceling and more about decoupling. I think :D

I'll take a look at your post, thanks!

15,848 Comments

@Vincent,

Good read - I didn't see a place to leave a comment though, so I'll ask here. In your Counter example, how would you actually increment the counter? I understand the concept of subscribing to updates. And, I assume somewhere someone has to call .next(). But, how is that done without mutating state? Doesn't something somewhere have to keep track of the current value?

26 Comments

@Ben,

Good point, I'll have to clarify that in my post as well. In any case, the stream here is the source for counter states - a new one is provided every time an increment or decrement takes place, at which point the view is immediately updated. Other than in the view, though, the state is not saved explicitly anywhere.

You could use e.g. button clicks as initiators of value changes. I'm not too up-to-date on Angular 2, but I believe it has the option to get form values as Observables as well? In any case, in plain Javascript, it'd look something like this: https://jsfiddle.net/79aw18g0/

26 Comments

@Ben,

Oh, and maybe I shouldn't ignore your other comment :P I'm not sure if there's general consensus on this, but in my view, you'd mainly consume Observables using pure functions - in other words, without side effects, such as "performing actions". So if you only care about performing actions, then I'd again not expose the Observable outside your service.

I'm currently a bit stuck about the part where you want to perform both the action and care for the response. Although it's the end of the day for me and my brains aren't working optimally, I'm starting to think I see your point - although `fromPromise(toPromise)` or whatever it was still feels like an antipattern to me. I'll have to think about it some more.

15,848 Comments

@Vincent,

Perhaps the biggest disconnect is that I simply don't view Observables in a *service layer* as being fundamentally different than anything else. To me, it just seems like an implementation detail. Meaning, I could have an API that uses Callbacks:

service.doSomething( inputs, callback );

... or refactor it use Promises:

service.doSomething( inputs ).then( callback );

... or refactor it use Generators (hope this one is right - I'm a noob there):

yield service.doSomething( inputs );

... or refactor it use Observables:

service.doSomething( inputs ).subscribe( callback );

In any of those cases, I wouldn't expect the *behavior of the service* to fundamentally change - only the way in which I was listening for the response. But, if we're saying that an Observable fundamentally changes the way a service works, then I think we just have two different mental models for what a service does. In my mind, it's just an implementation detail.

26 Comments

@Ben,

I agree. But I see two things in your comment: using Observables _in_ the service, and using them _as the API_ of that service.

If only the former, then I think you shouldn't leak any of the Observable implementation outside the service - in other words, you'd call `subscribe()` within the service.

When you're using it as the service's API (i.e. `service.doSomething( inputs ).subscribe( callback );`), then you should be able to assume that the caller knows that it will behave as an Observable - in other words, that it'll be lazy.

And that's really no different from exposing your API with a callback or a promise. Then, too, the caller will have to know how it behaves. For example, where you can try-catch can an exception for callbacks, you'll have to call .catch() or pass an error handler to the promise.

15,848 Comments

@Vincent,

I've been going over this and over this in my mind. Between Hot and Cold Observables and how much services should be coupled to the calling context, and vice-versa, something just feels very wrong to me. I think the fact that the Http class returns an Observable is part of what makes this all so cloudy. If Http implemented something else, like Promises, I don't think this would be an issue. But, the fact that Http generates Observables means that, in my service layer, if I'm using Http, I now have to either explicitly convert to something else (like a Promise), or just pass-it through. Just feels like Http is forcing me to make a decision and I'm irritated by the fact that I feel like I can't make enough of an educated decision.

Honestly, at this point, I'm sort of leaning towards making my Service layer using Promises. Or, when necessarily and explicitly different, an Observable in cases where the underlying Http needs to be aborted (such as with a type-ahead search box. But, the calling context could always convert to a stream using something like .defer() or .fromPromise().

26 Comments

@Ben,

I tend to agree (although I'm not that familiar with Angular 2 yet). Although I understand why they are used for HTTP requests in Angular applications, it doesn't feel that natural to me. I think it might be justifiable to convert them to a promise for your service, although I wouldn't then re-convert it back to a stream again.

15,848 Comments

@Vincent,

For what it's worth, I am *still* noodling on the topic :D And, I've actually started to gravitate back to Promises for the "core" of the app:

www.bennadel.com/blog/3202-my-evolving-angular-2-mental-model-promises-and-rxjs-observables.htm

.... I'll still uses Streams in the Controller layer, where and IF they make sense. But, I think for the core actions, Promises just feel like they have a better alignment with how I think about the "state machine" of the application as a whole.

I believe in love. I believe in compassion. I believe in human rights. I believe that we can afford to give more of these gifts to the world around us because it costs us nothing to be decent and kind and understanding. And, I want you to know that when you land on this site, you are accepted for who you are, no matter how you identify, what truths you live, or whatever kind of goofy shit makes you feel alive! Rock on with your bad self!
Ben Nadel