Node.js Transform Streams vs. Through2 Streams
As I've been trying to get my Gulp.js build script to work, I've found it necessary to dip down into Node.js and learn about streams. According to the Gulp.js documentation, all Gulps.js plugins are Transform streams, running in "Object Mode," that accepts and emit vinyl File objects. Furthermore, it looks like the Through2 module is the recommended way to build the given Transform streams. As such, I wanted to take my previous exploration and refactor it into a Through2 stream.
If you recall from yesterday, I built a Node.js Transform stream that took-in string data and emitted regular expression pattern matches. This RegExStream operated in "Object Mode" so that reads from the output would deal with individual pattern matches and never with partial character-reads of a single match.
In the previous experiment, I was piping a readable file stream into the RegExStream. But, to prepare for this Through2 experiment, I replaced the file stream with explicit calls to write(). This way, I could make sure that both approaches (raw Transform and Through2 streams) could process data that was being spread out over multiple chunks.
First, let's look at the refactored version of the experiment that deals with Node.js Transform streams directly. In the following demo, you'll see that I am explicitly defining RegExStream as a constructor that inherits from stream.Transform. I am then defining the _transform() and _flush() methods as functions on the RegExStream prototype.
// Include module references.
var stream = require( "stream" );
var util = require( "util" );
var chalk = require( "chalk" );
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I am a Transform stream (writable/readable) that takes input and finds matches to the
// given regular expression. As each match is found, I push each match onto the output
// stream individually.
function RegExStream( pattern ) {
// If this wasnt' invoked with "new", return the newable instance.
if ( ! ( this instanceof RegExStream ) ) {
return( new RegExStream( pattern ) );
}
// Call super-constructor to set up proper options. We want to set objectMode here
// since each call to read() should result in a single-match, never a partial match
// of the given regular expression pattern.
stream.Transform.call(
this,
{
objectMode: true
}
);
// Make sure the pattern is an actual instance of the RegExp object and not just a
// string. This way, we can treat it uniformly later on.
if ( ! ( pattern instanceof RegExp ) ) {
pattern = new RegExp( pattern, "g" );
}
// Since the patter is passed-in by reference, we need to create a clone of it
// locally. We're doing to be changing the RegExp properties and we need to make
// sure we're not breaking encapsulation by letting the calling scope alter it.
this._pattern = this._clonePattern( pattern );
// I hold the unprocessed portion of the input stream.
this._inputBuffer = "";
}
// Extend the Transform class.
// --
// NOTE: This only extends the class methods - not the internal properties. As such we
// have to make sure to call the Transform constructor (above).
util.inherits( RegExStream, stream.Transform );
// I clone the given regular expression instance, ensuring a unique refernce that is
// also set to include the "g" (global) flag.
RegExStream.prototype._clonePattern = function( pattern ) {
// Split the pattern into the pattern and the flags.
var parts = pattern.toString().slice( 1 ).split( "/" );
var regex = parts[ 0 ];
var flags = ( parts[ 1 ] || "g" );
// Make sure the pattern uses the global flag so our exec() will run as expected.
if ( flags.indexOf( "g" ) === -1 ) {
flags += "g";
}
return( new RegExp( regex, flags ) );
};
// I finalize the internal state when the write stream has finished writing. This gives
// us one more opportunity to transform data and push values onto the output stream.
RegExStream.prototype._flush = function( flushCompleted ) {
logInput( "@flush - buffer:", this._inputBuffer );
var match = null;
// Loop over any remaining matches in the internal buffer.
while ( ( match = this._pattern.exec( this._inputBuffer ) ) !== null ) {
logInput( "Push( _flush ):", match[ 0 ] );
this.push( match[ 0 ] );
}
// Clean up the internal buffer (for memory management).
this._inputBuffer = "";
// Signal the end of the output stream.
this.push( null );
// Signal that the input has been fully processed.
flushCompleted();
};
// I transform the given input chunk into zero or more output chunks.
RegExStream.prototype._transform = function( chunk, encoding, getNextChunk ) {
logInput( ">>> Chunk:", chunk.toString( "utf8" ) );
// Add the chunk to the internal buffer. Since we might be matching values across
// multiple chunks, we need to build up the buffer with each unused chunk.
this._inputBuffer += chunk.toString( "utf8" );
// Since we don't want to keep building a large internal buffer, we want to pair-
// down the content that we no longer need. As such, we're going to keep track of the
// the position of the last relevant index so that we can drop any portion of the
// content that will not be needed in the next chunk-processing.
var nextOffset = null;
var match = null;
// Loop over the matches on the buffered input.
while ( ( match = this._pattern.exec( this._inputBuffer ) ) !== null ) {
// If the current match is within the bounds (exclusive) of the input buffer,
// then we know we haven't matched a partial input. As such, we can safely push
// the match into the output.
if ( this._pattern.lastIndex < this._inputBuffer.length ) {
logInput( "Push:", match[ 0 ] );
this.push( match[ 0 ] );
// The next relevant offset will be after this match.
nextOffset = this._pattern.lastIndex;
// If the current match butts up against the end of the input buffer, we are in
// danger of an invalid match - a match that will actually span across two (or
// more) successive _write() actions. As such, we can't use it until the next
// write (or finish) event.
} else {
logInput( "Need to defer '" + match[ 0 ] + "' since its at end of the chunk." );
// The next relevant offset will be BEFORE this match (since we haven't
// transformed it yet).
nextOffset = match.index;
}
}
// If we have successfully consumed a portion of the input, we need to reduce the
// current input buffer to be only the unused portion.
if ( nextOffset !== null ) {
this._inputBuffer = this._inputBuffer.slice( nextOffset );
// If no match was found at all, then we can reset the internal buffer entirely. We
// know we won't need to be matching across chunks.
} else {
this._inputBuffer = "";
}
// Reset the regular expression so that it can pick up at the start of the internal
// buffer when the next chunk is ready to be processed.
this._pattern.lastIndex = 0;
// Tell the source that we've fully processed this chunk.
getNextChunk();
};
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// Create our regex pattern matching stream.
var regexStream = new RegExStream( /\w+/i );
// Read matches from the stream.
regexStream.on(
"readable",
function() {
var content = null;
// Since the RegExStream operates on "object mode", we know that we'll get a
// single match with each .read() call.
while ( content = this.read() ) {
logOutput( "Pattern match: " + content.toString( "utf8" ) );
}
}
);
// Write input to the stream. I am writing the input in very small chunks so that we
// can't rely on the fact that the entire content will be available on the first (or
// any single) transform function.
"How funky is your chicken? How loose is your goose?".match( /.{1,3}/gi )
.forEach(
function( chunk ) {
regexStream.write( chunk, "utf8" );
}
)
;
// Close the write-portion of the stream to make sure the last write() gets flushed.
regexStream.end();
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I log the given input values with a distinct color.
function logInput() {
var chalkedArguments = Array.prototype.slice.call( arguments ).map(
function( value ) {
return( chalk.magenta( value ) );
}
);
console.log.apply( console, chalkedArguments );
}
// I log the given output values with a distinct color.
function logOutput() {
var chalkedArguments = Array.prototype.slice.call( arguments ).map(
function( value ) {
return( chalk.bgMagenta.white( value ) );
}
);
console.log.apply( console, chalkedArguments );
}
At the bottom, I am splitting up my input and writing it, in small amounts, to the RegExStream. The RegExStream then turns around and emits regular expression pattern matches (in object mode) as individual values. When we run the above code, we get the following terminal output:
Now, let's look at refactoring this to use Through2 streams. To be clear, Through2 streams aren't really a special kind of stream - they are Node.js Transform streams. The Through2 module just encapsulates the construction of the Transform stream so that you don't have to worry about prototypal inheritance and defining prototype methods. Instead, you just pass in your transform() and flush() functions and Through2 will construct the stream object and wire up your methods.
Since our RegExStream parses data across chunks, it's important that it maintain some sort of internal state. In the first demo, we accomplished this using standard JavaScript object constructors with pseudo-private variables. With the Through2 streams, however, we no longer have that constructor. As such, we have to switch over to something like a Revealing Module Pattern so that we still have some sort of private memory space. This is why the following demo still defines a RegExStream function / constructor; but, it returns (aka, reveals) an instance of the Through2 stream.
// Include module references.
var through2 = require( "through2" );
var chalk = require( "chalk" );
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I am a Transform stream (writable/readable) that takes input and finds matches to the
// given regular expression. As each match is found, I push each match onto the output
// stream individually.
function RegExStream( patternIn ) {
// Make sure the pattern is an actual instance of the RegExp object and not just a
// string. This way, we can treat it uniformly later on.
if ( ! ( patternIn instanceof RegExp ) ) {
patternIn = new RegExp( patternIn, "g" );
}
// Since the patter is passed-in by reference, we need to create a clone of it
// locally. We're doing to be changing the RegExp properties and we need to make
// sure we're not breaking encapsulation by letting the calling scope alter it.
var pattern = clonePattern( patternIn );
// I hold the unprocessed portion of the input stream.
var inputBuffer = "";
// Return the Transform stream wrapper. We're using the "obj" convenience method
// since we want out read stream to operate in object mode - this way, each read()
// invocation will constitute a single pattern match.
return( through2.obj( transform, flush ) );
// ---
// PRIVATE METHODS.
// ---
// I clone the given regular expression instance, ensuring a unique reference that
// is also set to include the "g" (global) flag.
function clonePattern( pattern ) {
// Split the pattern into the pattern and the flags.
var parts = pattern.toString().slice( 1 ).split( "/" );
var regex = parts[ 0 ];
var flags = ( parts[ 1 ] || "g" );
// Make sure the pattern uses the global flag so our exec() will run as expected.
if ( flags.indexOf( "g" ) === -1 ) {
flags += "g";
}
return( new RegExp( regex, flags ) );
}
// Since we are no longer using Prototype methods, we're creating functions that
// are bound to instances of the RegExStream. As such, we should probably clean
// up variable references to help the garbage collector, especially since we're
// also passing those methods "out of scope."
// --
// CAUTION: I'm not entirely sure this is necessary in NodeJS. In JavaScript on the
// browser, the worst-case is that the user refreshes their page. But, when running
// JavaScript on the server, we might need to be more vigilant about this stuff.
function destroy() {
patternIn = pattern = inputBuffer = clonePattern = destroy = flush = transform = null;
}
// I finalize the internal state when the write stream has finished writing. This gives
// us one more opportunity to transform data and push values onto the output stream.
function flush( flushCompleted ) {
logInput( "@flush - buffer:", inputBuffer );
var match = null;
// Loop over any remaining matches in the internal buffer.
while ( ( match = pattern.exec( inputBuffer ) ) !== null ) {
logInput( "Push( _flush ):", match[ 0 ] );
this.push( match[ 0 ] );
}
// Clean up the internal buffer (for memory management).
inputBuffer = "";
// Signal the end of the output stream.
this.push( null );
// Signal that the input has been fully processed.
flushCompleted();
// Tear down the variables to help garbage collection.
destroy();
}
// I transform the given input chunk into zero or more output chunks.
function transform( chunk, encoding, getNextChunk ) {
logInput( ">>> Chunk:", chunk.toString( "utf8" ) );
// Add the chunk to the internal buffer. Since we might be matching values across
// multiple chunks, we need to build up the buffer with each unused chunk.
inputBuffer += chunk.toString( "utf8" );
// Since we don't want to keep building a large internal buffer, we want to pair-
// down the content that we no longer need. As such, we're going to keep track of
// the the position of the last relevant index so that we can drop any portion of
// the content that will not be needed in the next chunk-processing.
var nextOffset = null;
var match = null;
// Loop over the matches on the buffered input.
while ( ( match = pattern.exec( inputBuffer ) ) !== null ) {
// If the current match is within the bounds (exclusive) of the input
// buffer, then we know we haven't matched a partial input. As such, we can
// safely push the match into the output.
if ( pattern.lastIndex < inputBuffer.length ) {
logInput( "Push:", match[ 0 ] );
this.push( match[ 0 ] );
// The next relevant offset will be after this match.
nextOffset = pattern.lastIndex;
// If the current match butts up against the end of the input buffer, we are
// in danger of an invalid match - a match that will actually span across two
// (or more) successive _write() actions. As such, we can't use it until the
// next write (or finish) event.
} else {
logInput( "Need to defer '" + match[ 0 ] + "' since its at end of the chunk." );
// The next relevant offset will be BEFORE this match (since we haven't
// transformed it yet).
nextOffset = match.index;
}
}
// If we have successfully consumed a portion of the input, we need to reduce
// the current input buffer to be only the unused portion.
if ( nextOffset !== null ) {
inputBuffer = inputBuffer.slice( nextOffset );
// If no match was found at all, then we can reset the internal buffer entirely.
// We know we won't need to be matching across chunks.
} else {
inputBuffer = "";
}
// Reset the regular expression so that it can pick up at the start of the
// internal buffer when the next chunk is ready to be processed.
pattern.lastIndex = 0;
// Tell the source that we've fully processed this chunk.
getNextChunk();
}
}
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// Create our regex pattern matching stream.
var regexStream = new RegExStream( /\w+/i );
// Read matches from the stream.
regexStream.on(
"readable",
function() {
var content = null;
// Since the RegExStream operates on "object mode", we know that we'll get a
// single match with each .read() call.
while ( content = this.read() ) {
logOutput( "Pattern match: " + content.toString( "utf8" ) );
}
}
);
// Write input to the stream. I am writing the input in very small chunks so that we
// can't rely on the fact that the entire content will be available on the first (or
// any single) transform function.
"How funky is your chicken? How loose is your goose?".match( /.{1,3}/gi )
.forEach(
function( chunk ) {
regexStream.write( chunk, "utf8" );
}
)
;
// Close the write-portion of the stream to make sure the last write() gets flushed.
regexStream.end();
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I log the given input values with a distinct color.
function logInput() {
var chalkedArguments = Array.prototype.slice.call( arguments ).map(
function( value ) {
return( chalk.cyan( value ) );
}
);
console.log.apply( console, chalkedArguments );
}
// I log the given output values with a distinct color.
function logOutput() {
var chalkedArguments = Array.prototype.slice.call( arguments ).map(
function( value ) {
return( chalk.bgCyan.white( value ) );
}
);
console.log.apply( console, chalkedArguments );
}
As you can see, we neither deal with util.inherits() to extend stream.Transform, nor do we define prototype methods on our RegExStream constructor. Instead, we simply pass the transform() and flush() functions into the through2.obj() method which returns an instance of the Node.js Transform stream.
NOTE: through2.obj() is a convenience method that simply constructs the underlying Transform stream in "Object Mode."
When we run the above Through2-based code, we get the following terminal output:
As you can see, we get the same exact behavior - the Through2 module simply encapsulated the stream construction. And, if you like the revealing module pattern, you may actually find the look and feel of this version a bit more pleasant; sometimes, dealing directly with the "prototype" object feels a bit too noisy.
You may notice that this version of the RegExStream has a destroy() method. I added this because I am not sure very familiar with the Node.js garbage collector. What I do know is that we are defining our transform() and flush() functions in one lexical scope and then we are passing those functions "out of scope." This is creating a closure over the references defined in the RegExStream function (and the rest of the file). As such, I thought it might be a good idea to nullify as many references as I could once I knew that the stream was no longer "active."
This approach - using the destroy() method - may be totally overkill. Perhaps someone with more Node.js understanding can drop a comment in regard to this matter.
All in all, both approaches are doing the same thing. The only substantive difference is the way in which the actual Node.js Transform stream is being constructed. It seems like it's just a matter of personal preference. And, while I do like prototypal inheritance and it's efficient memory management, I do have to say that the revealing module pattern is just more attractive.
Want to use code from this post? Check out the license.
Reader Comments
var parts = pattern.toString().slice( 1 ).split( "/" );
This is absolutely awful and brittle. Just try matching for "http://foo.bar/baz" and watch it explode. On top of that, toString() is not actually standardised, so different implementations might return other things than the string you expect. What you *should* be using is pattern.source, which will reliably give you the actual pattern and not get fooled by any escapings. Annoyingly enough, though, there's no pattern.flags. To copy those, the only reliable method is to go longform and construct a flag string by reading out all the properties listed in:
https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/RegExp#RegExp_prototype_objects_and_instances
You *could* use parts[parts.length - 1] reliably in the sense that as the last part, it will not be influenced by any extra slashes and will give you the flags, but it also means you open yourself up for cryptic failures on any implementation that doesn't provide the expected RegExp.prototype.toSource()
@Mathrick,
Oh man, you're absolutely right. I can't believe I missed that! I think I was trying so hard to wrap my head around Node.js streams, it didn't even occur to me that patterns could have forward-slashes. Excellent catch!
I took a stab at a more durable version of the cloning, with the ability to ensure / inject certain flags (which I needed for my use-case -- making sure the global flag was set):
www.bennadel.com/blog/2664-cloning-regexp-regular-expression-objects-in-javascript.htm
Thanks for insight!
@Ben,
you're welcome! That's one of the reasons we have code reviews before anything gets into the tree here at work and I wouldn't have it any other way :)
Also (you're free to ignore this!) : I prefer to write my nick all-lowercase. Just a tiny nitpick.
@mathrick,
Ah, no problem :) When I hit the "reply" button, the code automatically upper-cases the first character. Cheers.