Skip to main content
Ben Nadel at CFUNITED 2010 (Landsdown, VA) with: Josh Highland and Luis Majano
Ben Nadel at CFUNITED 2010 (Landsdown, VA) with: Josh Highland Luis Majano

You Have To Explicitly End Streams After Pipes Break In Node.js

By
Published in Comments (12)

I've spent the last few months learning about Node.js in an effort to figure out why my Gulp.js script doesn't work (I still don't know). This has become quite the Herculean task, winding my way down the rabbit hole of Node.js streams. This morning, I wanted to look at one interesting behavior - the fact that you have to explicitly end your streams after your pipes break.

Last month, I started to look at how error events affect Node.js pipes. I demonstrated that error events don't stop streams, and they only affect the piped-status of connected streams. As such, it shouldn't be surprising that you have to explicitly end streams after pipes break; but, this point has been clouded in my mind thanks to the enormous learning curve of Node.js streams, and a number of misleading examples.

As I've been trying to debug my Gulp.js streams, I've come across a number of examples that look like this:

someStream.on( "error", function( error ) {
this.emit( "end" );
});
view raw bad-example.js hosted with ❤ by GitHub

At first glance, this looks kind of right. But, the problem is that the approach doesn't fulfill the intent. The intent is tell other interested parties (ex, other streams) that the current stream has ended. But, the reality is, the stream hasn't ended - you've just told people it has. The stream will only end when you call .end() on it.

In a pipe context, this miscommunication becomes even more murky because the wrong approach almost looks like it works. But, it will fail in interesting ways. To see this in action, check out the video above.

Now, it doesn't seem that there is any rule of thumb that says you should end a stream just because an error occurred. After all, there's no reason that a stream (aka, an EventEmitter) can't emit more than one error. As such, the appropriate reaction to an error must depend on your particular context. That said, if an error causes streams to be unpiped, you'll probably want to clean that up.

To explore this concept, I've put together a small demo that pipes three streams together:

  • Readable - FriendStream
  • Transform - ComplimentStream
  • Writable - JournalStream

I used these three types of Node.js streams as it mirrors the way Gulp.js works with source streams, Transform streams, and destination streams. In my demo, the compliment stream emits an error half way through the transformations. This causes an unpipe event which has to be dealt with:

// Require module references.
var stream = require( "stream" );
var util = require( "util" );
var chalk = require( "chalk" );
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I am a SOURCE stream, providing a stream of Friends, in object mode.
function FriendStream() {
stream.Readable.call(
this,
{
objectMode: true
}
);
this._friends = [ "Kim", "Sarah", "Kit", "Tricia", "Libby", "Joanna" ];
}
util.inherits( FriendStream, stream.Readable );
// I read data out of the underlying source and push it only the underlying buffer.
FriendStream.prototype._read = function( size ) {
// While we still have Friends, and the buffer is not full, keep pushing friends.
while ( this._friends.length && size-- ) {
if ( this.push( this._friends.shift() ) === null ) {
break;
}
}
// If we have no more friends, end the stream.
if ( ! this._friends.length ) {
this.push( null );
}
};
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I am a TRANSFORM stream, decorating each friend with a compliment, in object mode.
function ComplimentStream() {
stream.Transform.call(
this,
{
objectMode: true
}
);
}
util.inherits( ComplimentStream, stream.Transform );
// I transform the input chunk to the output chunk.
ComplimentStream.prototype._transform = function( friend, isEncoded, getNextChunk ) {
// Issuing an error for the exploration.
if ( friend === "Kit" ) {
return( getNextChunk( new Error( "No Kits allowed!" ) ) );
}
this.push( friend + ", you are awesome!" );
getNextChunk();
};
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I am a DESTINATION stream, keeping track of journal line items, in object mode.
function JournalStream() {
stream.Writable.call(
this,
{
objectMode: true
}
);
this._entries = [];
}
util.inherits( JournalStream, stream.Writable );
// I write the given entry to the internal journal.
JournalStream.prototype._write = function( entry, encoding, done ) {
this._entries.push( entry );
done();
};
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// Create a new instance of our compliment stream (ie, our TRANSFORM stream). This acts
// as both a Writable and a Readable stream.
var complimentStream = new ComplimentStream()
.on(
"unpipe",
function handleUnpipeEvent( source ) {
console.log( chalk.bgYellow( "FriendStream unpiped from ComplimentStream." ) );
}
)
.on(
"error",
function handleErrorEvent( error ) {
console.log( chalk.red( "Compliment error:", error ) );
// When the compliment stream raises an error, the FriendStream is
// automatically going to unpipe itself from this [ComplimentStream] stream.
// That's all that Node.js does in the event of an error in a pipe-context.
// The stream itself is still left open. But, since we know that no more
// friends are going to be written, we have to explicitly END the Writable
// aspect of the Transform stream.
// --
// NOTE: Sometimes you see people "emit" an "end" event here. That is the
// wrong approach as it signals the end of the stream _without_ actually
// ending it, which is poor implementation of intent.
this.end();
}
)
;
// Create our streams and pipe them : FRIENDS -> COMPLIMENT -> JOURNAL.
var journalStream = new FriendStream()
.pipe( complimentStream )
.pipe( new JournalStream() )
;
// When the DESTINATION stream is finished, log the state of the journal entries.
journalStream.on(
"finish",
function handleEndEvent() {
console.log( chalk.cyan( "Stream finished." ) );
console.dir( this._entries );
}
);
view raw test.js hosted with ❤ by GitHub

As you can see, the ComplimentStream will emit an error if one of the inputs is "Kit." And, when we run the above Node.js code, we get the following terminal output:

node test.js
FriendStream unpiped from ComplimentStream.
Compliment error: Error: No Kits allowed!
Stream finished.
[ 'Kim, you are awesome!', 'Sarah, you are awesome!' ]

Now, if you had tried to emit("end") instead of calling end(), you would have gotten a very different result; and, one that is not necessarily consistent depending on settings.

For you Node.js developers, this is probably a "yeah, duh" moment - of course you have to end streams. But, for me, building a solid and consistent mental model for Node.js streams has been an uphill battle. Every time I think I have a decent handle on it, I find myself making mistakes and uncovering false assumptions.

Want to use code from this post? Check out the license.

Reader Comments

19 Comments

Why not both? Emit an "end" and then .end() it. As Bill Cosby once quipped: "First you say it, then you do it."

15,921 Comments

@JC,

I like the quote :) I think you won't need end. If you call .end(), the "end" event should be emitted automatically.

1 Comments

This post is rather old, but by now I'm sure that you know that its not you, its them. The reason why its hard to build solid and consistent mental of node streams is that because there isn't any. They are simply incredibly quirky and inconsistent, and only a few people have a complete mental picture of all the weirdness.

I'm betting it doesn't have to be that way, though. I started collecting a list of all the issues here: https://github.com/spion/promise-streams/issues/8 and I'm hoping that once we have all of them collected, a proper redesign that works much better will emerge. I'll try to add all your observations from this article and any others you may have - thanks for that!

1 Comments

What would you recommend to do if you want the stream to continue after unpiping? I would just want the output to include "Tricia", "Libby" and "Joanna" as well, so a simple continue if you would call it that.

Was looking into domains as a solution but these are deprecated right now so that does not seem like a great idea...

More ontopic, thanks for this great post, cleared up some things for me!

1 Comments

@Ben,

Regarding this.end and this.emit('end'), I have experienced this.end() not always triggering the end event. I did both and everything seeemed to work. Thanks for your posts, the general docs for error handling in streams is woeful.

1 Comments

@Ryan

If you have a transform stream and you call .end() then it should emit the finish event once it's piped all it's data.

In the case the stream you were piping to has died then it may be stuck with data in it's buffer after calling .end(), and therefore it does not emit the 'finish' event. A workaround for transform streams I have used in the past is:

const PassThrough = require('stream').PassThrough
...
this.end();
this.pipe(new PassThrough());

pretty ugly but it seems to work ok when emitting an 'error' even is unsuitable.

1 Comments

Hi Ben,

Quite some time since, but how about all those examples that use `process.exit(1)` in the onError handler. Does it do the same thing as `this.end()` ?

I'm having a case where I pipe browserify output into uglifyjs (on cmd) and the command exits with 0 even if there is an error in the browserify part...

I believe in love. I believe in compassion. I believe in human rights. I believe that we can afford to give more of these gifts to the world around us because it costs us nothing to be decent and kind and understanding. And, I want you to know that when you land on this site, you are accepted for who you are, no matter how you identify, what truths you live, or whatever kind of goofy shit makes you feel alive! Rock on with your bad self!
Ben Nadel