ColdFusion 10 Beta - Closures And Function Expressions And Threads
Yesterday, I really started to dig into the Closures and Function Expressions that were introduced in ColdFusion 10. I didn't really get into the pros and cons of using them; but, after the post yesterday, I hope we can start to agree that they are fairly badass. Today, I wanted to continue exploring closures and function expression in ColdFusion 10 beta - this time, in the context of threads and the CFThread tag.
Pass-By-Reference Or Pass-By-Value
When it comes to using closures in conjunction with CFThread, the first huge question that jumps to mind is, How does the closure get passed into a thread? As I've talked about before, ColdFusion passes values (via attributes) into a thread using a deep-copy. This applies to every kind of data structure, including a ColdFusion component.
But, closures present a serious dilemma when it comes to deep-copy! As we demonstrated yesterday, closures are lexically bound to values at define time. So, if we were to deep-copy a closure, we wouldn't only be coping the internals of the function, we'd be copying everything that was bound to it at define-time. Depending on your particular chunk of code, this could trigger a deep-copy of your entire application.
Clearly, closures throw an enormous monkey-wrench into the pass-by-value approach to CFThread attributes. At least in theory; let's try passing a closure into a CFThread tag and see what actually happens.
In the following demo, we're going to create a Logging closure that is lexically bound to a Counter. Then, we're going to pass the closure into several CFThread instances and invoke the logger. If the closure is passed-by-value, the counter should be the same for every closure invocation; if it is passed-by-reference, the counter should be shared across each closure invocation (and incremented several times):
<!---
Start a CFScript block. Closures can only be used inside
of CFScript due to the syntax required to define them.
--->
<cfscript>
// I am a factory for the thread callbacks.
function getThreadCallback(){
// Keep a running tally of how many times the log methods
// are called. We are doing this to see how Closure bindings
// are copied into the thread scope (copy vs. reference).
var callCount = 0;
// Define the log file for this demo -- we will need a log
// file since threads are async and we won't be able to
// write to the output.
var logFilePath = expandPath( "./thread.log" );
// Define the callback handler. This will use both a
// lexically-bound counter as well as a passed-in value for
// counter; this way, we can look at the various forms of
// attribute-copy.
var callback = function( metaCounter ){
// Open the file for appending.
var logFile = fileOpen( logFilePath, "append" );
fileWriteLine(
logFile,
"[#++callCount#][#metaCounter#] Finished with a success!"
);
// Clean up the file access.
fileClose( logFile );
};
// Return the callback.
return( callback );
}
// Get the callback closure (bound to the counter).
callback = getThreadCallback();
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Create a struct with a counter so that we can demonstrate that
// threads copy attributes by VALUE.
metaData = {
counter: 0
};
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Run a successful thread - pass in the callback handler and
// the meta count. Notice that these are both being passed-in
// via the attributes scope.
thread
name = "demoThread"
action="run"
util = metaData
onsuccess = callback
{
// Execute the success callback.
onsuccess( ++util.counter );
}
// Run a successful thread - pass in the callback handler and
// the meta count. Notice that these are both being passed-in
// via the attributes scope.
thread
name = "demoThread2"
action="run"
util = metaData
onsuccess = callback
{
// Execute the success callback.
onsuccess( ++util.counter );
}
// Run a successful thread - pass in the callback handler and
// the meta count. Notice that these are both being passed-in
// via the attributes scope.
thread
name = "demoThread3"
action="run"
util = metaData
onsuccess = callback
{
// Execute the success callback.
onsuccess( ++util.counter );
}
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Wait for all the threads to finish running.
thread action="join";
// Debug the threads.
// writeDump( cfthread );
</cfscript>
As a control, we're passing both a closure and a common ColdFusion struct into each CFThread tag. We don't know how the closure will behave; but, we expect the struct to be passed-by-value. Inside each of the three threads, the logging closure is invoked and the control counter is passed through. When we run the above code, we get the following output in our log file:
[1][1] Finished with a success!
[2][1] Finished with a success!
[3][1] Finished with a success!
Very cool! In the log file, the first number column is the closure-bound counter - the second number column is our "control" counter. As expected, the control counter, which was passed-by-value into the CFThread tag, is the same every time the log function is called. The closure-bound counter, on the other hand, has properly incremented across each closure invocation! This means that the values lexically bound to the closure are not deep-copied when the closure is passed into the CFThread tag.
Is the closure itself copied by reference? I don't know. And to be honest, I don't think it really matters. As long as the bound-data is copied by reference, the actual mechanics of the closure-copy becomes mostly immaterial. Since ColdFusion doesn't provide a way to compare two object references, the technical means of copy become much less relevant in this case.
Managing A Series Of Asynchronous CFThread Tags
Now that we know that closures can be passed into threads while maintaining their lexical bindings, we can start to interact with the CFThread tag in exciting ways. Threads are asynchronous. This means that we're not really sure when they are going to run; and, we can't be to sure when they're going to finish. Sure, we have the CFThread[action=join] approach to thread control; but, with closures, we come up with a more asynchronous way to manage a serious of CFThread tags.
For this demo, we're going to create a method - threadsDone() - that monitors N-number of threads and then executes a callback when all of the threads has finished execution (whether by success of failure). Since the threads are running asynchronously, it means that our monitor also has to run asynchronously, inside of its own thread. This will give us a great opportunity to mix threads with closures.
<!---
Start a CFScript block. Closures can only be used inside
of CFScript due to the syntax required to define them.
--->
<cfscript>
// I accept N thread instances and a callback (the last
// argument). The callback is invoked when each one of the given
// threads has completed execution (success or failure). The
// thread object is echoed back in the callback arguments.
function threadsDone(){
// Extract the callback from the arguemnts.
var callback = arguments[ arrayLen( arguments ) ];
// Extract the thread objects - arguments[1..N-1].
var threads = arraySlice( arguments, 1, arrayLen( arguments ) - 1 );
// I am a utiltiy function that determine if a given thread
// is still running based on its status.
var isThreadRunning = function( threadInstance ){
// A thread can end with a success or an error, both of
// which lead to different final status values.
return(
(threadInstance.status != "TERMINATED") &&
(threadInstance.status != "COMPLETED")
);
};
// I am a utility function that determines if any of the
// given blogs are still running.
var threadsAreRunning = function(){
// Loop over each thread to look for at least ONE that is
// still running.
for (var threadInstance in threads){
// Check to see if this thread is still running. This
// is if it has not yet completed (successfully) or
// terminated (error).
if (isThreadRunning( threadInstance )){
// We don't need to continue searching; if this
// thread is running, that's good enough.
return( true );
}
}
// If we made it this far, then no threads are running.
return( false );
};
// I am utility function that invokes the callback with the
// given thread instances.
var invokeCallback = function(){
// All of the threads we are monitoring have stopped
// running. Now, we can invoke the callback. Let's build
// up an argument collection.
var callbackArguments = {};
// Translate the array-based arguments into a struct-
// based collection of arguments.
for (var i = 1 ; i < arrayLen( threads ) ; i++){
callbackArguments[ i ] = threads[ i ];
}
// Invoke the callback with the given threads.
callback( argumentCollection = callbackArguments );
};
// In order to check the threads, we need to launch an
// additional thread to act as a monitor. This will do
// nothing but sleep and check the threads.
//
// NOTE: We need to pass the two methods INTO the thread
// explicitly since the thread body does NOT have access
// to the local scope of the parent function.
thread
name = "threadsDone_#getTickCount()#"
action = "run"
threadsarerunning = threadsAreRunning
invokecallback = invokeCallback
{
// Check to see if the threads are running.
while (threadsAreRunning()){
// Sleep briefly to allow other threads to complete.
thread
action="sleep"
duration="10"
;
}
// If we made it this far, it means that the threads
// have all finished executing and the while-loop has
// been exited. Let's invoke the callback.
invokeCallback();
};
}
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Launch a thread - remember this is an ASYNCHRONOUS operation.
thread
name = "thread1"
action = "run" {
// Sleep the thread breifly.
sleep( randRange( 10, 100 ) );
}
// Launch a thread - remember this is an ASYNCHRONOUS operation.
thread
name = "thread2"
action = "run" {
// Sleep the thread breifly.
sleep( randRange( 10, 100 ) );
}
// Launch a thread - remember this is an ASYNCHRONOUS operation.
thread
name = "thread3"
action = "run" {
// In this one, let's throw an error to show that this works
// with failed threads as well as successful one.
//
// NOTE: Since this is an asynchronous process, this error
// does not kill the parent process.
throw(
type = "threadError",
message = "Something went wrong in the thread!"
);
}
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Wait for all threads to finish and the run the given
// callback.
//
// NOTE: For demo purposes, we're going to JOIN the threads so
// that we can write to the page output.
threadsDone(
cfthread.thread1,
cfthread.thread2,
cfthread.thread3,
function( thread1, thread2, thread3 ){
// Log the status of the completed threads.
writeOutput( "Threads have finished! #now()#<br />" );
writeOutput( "#thread1.name# - #thread1.status#<br />" );
writeOutput( "#thread2.name# - #thread2.status#<br />" );
writeOutput( "#thread3.name# - #thread3.status#<br />" );
}
);
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Wait for all threads to join so we can see the output.
thread action="join";
// Debug threads.
// writeDump( cfthread );
</cfscript>
The first thing that you might notice about this code is that closures and function expressions allow us to really break up our algorithms into smaller, more cohesive chunks; and, we can do so without wreaking havoc on the page's Variable scope. Since many of our closures are defined within other functions, it allows the entirety of our algorithms to be properly encapsulated without creating page-level variables.
There's a lot of code here, but the meat of the code is the threadsDone() method. This method defines a good number of local values and local closures (that act on those values). It also defines its own thread as a means to monitor the other asynchronous threads. Notice that we are passing several utility closures into the supervisor thread as means to interact with the threadsDone() local scope. Since the closures are passed-by-reference, we don't have to worry about losing connection with the local variables.
When we run this code and monitor the three threads, we get the following page output (after all the threads have been Joined for debugging):
Threads have finished! {ts '2012-02-21 08:25:38'}
THREAD1 - COMPLETED
THREAD2 - COMPLETED
THREAD3 - TERMINATED
This is pretty cool! The thread-closure interaction here provides a very elegant way of managing the execution of multiple, asynchronous threads.
Closures Are Bound To Page Context (And Output Buffer)
So far, we've seen that a closure is bound to the values in its lexical environment (ie. what it could "see" at define-time). But does this binding extend to aspects of the "page context?" When it comes to the CFThread tag, we know that each thread gets its own output buffer. If we define a closure outside of the thread and pass it into the thread, which output buffer does the closure use? And conversely, if we define a closure inside of the CFThread tag and then pass it out, which output buffer does it use?
To experiment with this, we're going to create two closures - one outside a CFThread tag, the other inside a CFThread tag. Each closure will grab the hashCode of the currently-bound page context and output a message to the currently-bound output buffer. These closures will then be passed into and out of the thread, respectively, and invoked.
<!---
Start a CFScript block. Closures can only be used inside
of CFScript due to the syntax required to define them.
--->
<cfscript>
// Create a closure that is bound to the primary page's
// output buffer.
writeToBrowser = function( value ){
// Get the current page context hash code.
var hashCode = getPageContext().hashCode();
// Write to the currently-bound output buffer.
writeOutput( "[#hashCode#] #value#" );
};
// Test the top-level output / hashCode binding.
writeToBrowser( "Top-level CONTROL write.<br />" );
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Run a thread and pass-in the parent-page-bound writer. When
// this thread is done executing, it will save a thread-bound
// writer to the THREAD scope.
thread
name = "test"
action = "run"
toplevelwriter = writeToBrowser
{
// Try writing to whatever output is currently defined.
writeOutput( "Direct output within thead.<br />" );
// Try writing to the closure-based writer.
topLevelWriter( "Top-level writer within thread.<br />" );
// Let's export a closure who's bound to the thread output
// buffer.
thread.writeToThread = function( value ){
// Get the current page context hash code.
var hashCode = getPageContext().hashCode();
// Write to currently-bound output buffer.
writeOutput( "[#hashCode#] #value#" );
};
// Execute the internal write.
thread.writeToThread( "Wrapped output within thread." );
// Sleep the thread to allow access to the exported function
// before the thread has actuall finished running.
sleep( 1000 );
}
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// HOPEFULLY give the above thread enough time to export the
// writer method.
sleep( 500 );
// Try invoking the thread-exported writer.
cfthread.test.writeToThread(
"Thread-writer output - where's it gonna go?<br />"
);
// Join all the threads back to the page.
thread action = "join";
// Debug the threads.
writeDump( cfthread );
</cfscript>
As you can see, each closure is executed both internal and external to the CFThread tag. When we run the above code we get the following page output:
[521595920] Top-level CONTROL write.
[521595920] Top-level writer within thread.
As you can see, the only output we get in the main page is the output generated by the externally-defined closure. Even when we export the thread-based closure - writeToThread() - and execute it in the top-level page, it doesn't produce any visible content.
If you look at the joined-thread output in the debugging code, however, you'll see that internally, the thread produced the following content:
Direct output within thead.<br />
[970334180] Wrapped output within thread.
[970334180] Thread-writer output - where's it gonna go?<br />
Even when we exported the thread-based writer, invoking it in the top-level context caused output at the thread-level context. This demonstrates that, in additional to explicitly defined variables in the lexical environment, the closures are implicitly bound to their original page context.
Creating Event-Emitting Threads
Now that we see how CFThread tags and closures interact, I thought it'd be fun to really try and take the complexity up to the next level and create evented threads; that is, to create a CFThread tag that can "publish" events during its asynchronous lifecycle. Since CFThread tags start in an asynchronous manner, we can't rely on the thread to create its own publish/subscribe system (as we can't be sure when it would become available); rather, we'll need to create our own even emitter and then pass it into the thread for consumption.
For this fun exploration, we're going to create an EventEmitter() object. We will then pass this event emitter into the CFThread tag for event-publication. As we talked about before, we know that objects/structs/arrays get passed into CFThreads by Value (not by reference). In our case, this doesn't much matter since the EventEmitter() is nothing more than a collection of closures. As we saw above, closures, when passed into a CFThread tag, retain their lexical binding; as such, the deep-copy of our closure-collection becomes irrelevant.
The EventEmitter() instance will provide three publish and subscribe functions:
- on( eventType, callback )
- off( eventType, callback )
- trigger( eventType, data )
The on() and off() methods are for binding and unbinding a callback, respectively. The trigger() method is used by the evented "subject" as a means to publish events.
<!---
Start a CFScript block. Closures can only be used inside
of CFScript due to the syntax required to define them.
--->
<cfscript>
// I create evented objects for publish and subscribe
// functionality. This factory creates objects that have the
// following method API:
//
// - on( eventType, callback )
// - off( eventType, callback )
// - trigger( eventType, data )
EventEmitter = function(){
// In order for the event bindings to work, we need to be
// able to bind and Unbind them. For this, we'll need to keep
// a unique ID for each data. And, in order to generate that
// ID, we'll need to store event-emitter meta data.
var eventEmitterMetaData = getMetaData( EventEmitter );
// Add the unique ID if it doesn't exist.
if (!structKeyExists( eventEmitterMetaData, "uuid" )){
// Initialize the uuid.
eventEmitterMetaData.uuid = 0;
}
// Create a cache of event bindings. Each event will be
// defined as a type and a set of callbacks.
var events = {};
// Create an instance of emitter.
var emitter = {};
// I add the given callback to the given event cache.
var addCallback = function( eventType, callback ){
// Check to see if this callback needs to be prepared
// for use with the event system.
if (!structKeyExists( getMetaData( callback ), "eventEmitterUUID" )){
// Add the UUID to the callback meta data.
//
// NOTE: Since this it is very likely (expected) that
// event callbacks will be created as Closrues, we
// can expect each closure to be unique and therefore
// to have its own proprietary meta data.
getMetaData( callback ).eventEmitterUUID = ++eventEmitterMetaData.uuid;
}
// Make sure we have a cache for this particular type of
// event before we try to access it.
if (!structKeyExists( events, eventType )){
// Create the cache.
events[ eventType ] = [];
}
// Add the callback to this collection.
arrayAppend( events[ eventType ], callback );
};
// I remove the callback from the given event cache.
var removeCallback = function( eventType, callback ){
// Make sure the callback has an event emitter UUID. If
// it doesn't, then we it shouldn't be part of the system.
if (!structKeyExists( getMetaData( callback ), "eventEmitterUUID" )){
throw( type = "InvalidCallback" );
}
// Check to make sure we have a cache for the given
// event type.
if (!structKeyExists( events, eventType )){
// No cache - nothing more to check.
return;
}
// Get the target UUID.
var uuid = getMetaData( callback ).eventEmitterUUID;
// Filter the callback out of the event type.
//
// NOTE: I am using an intermediary variable here for the
// result to get around some weird parsing bug.
var cache = arrayFilter(
events[ eventType ],
function( boundCallback ){
// Exclude if this method is bound.
if (getMetaData( boundCallback ).eventEmitterUUID == uuid){
return( false );
}
// Include this callback in the result.
return( true );
}
);
// Save the filtered cache of event callbacks.
//
// NOTE: See note from above (bug).
events[ eventType ] = cache;
};
// Create the bind method.
emitter.on = function( eventType, callback ){
// Add the callback to the appropriate callback cache.
addCallback( eventType, callback );
// Return "this" object for method chaining.
return( emitter );
};
// Create the unbind method.
emitter.off = function( eventType, callback ){
// Remove the callback from the appropriate callback cache.
removeCallback( eventType, callback );
// Return "this" object for method chaining.
return( emitter );
};
// Create the trigger method.
emitter.trigger = function( eventType, data ){
// Make sure we have callbacks for this event.
if (
!structKeyExists( events, eventType ) ||
!arrayLen( events[ eventType ] )
){
// No callbacks to trigger - exit out.
return;
}
// Loop over each callback to invoke it for the event.
arrayEach(
events[ eventType ],
function( callback ){
// Invoke the callback with the given data.
//
// NOTE: I am using the argumentCollection here
// to get around a strange named-argument bug.
callback( argumentCollection = { "1" = data } );
}
);
};
// Return the event emitter instance.
return( emitter );
};
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Create an instance of event emitter. This will have the
// publication and subscription methods for:
//
// - on()
// - off()
// - trigger()
evented = EventEmitter();
// Run a thread an pass in the event emitter. This will allow the
// the thread to announce events during its execution.
thread
name = "eventedThread"
action = "run"
beacon = evented
{
// Sleep this thread immediately to let the code AFTER the
// thread run and bind to the events.
sleep( 100 );
// Trigger start event.
beacon.trigger(
"threadStarted",
"The thread has started!"
);
sleep( 100 );
// Trigger end event.
beacon.trigger(
"threadEnded",
"The thread has ended!"
);
}
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Before we check the valid event emitting, we want to check to
// make sure that the UNBIND feature works.
tempCallback = function( message ){
writeOutput( "This should never be called.<br />" );
};
// Bind and UNBIND the callback. We want to make sure that it
// will NOT get called for the given event types.
evented
.on( "threadEnded", tempCallback )
.off( "threadEnded", tempCallback )
;
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Bind to the "Start" event on the thread.
evented.on(
"threadStarted",
function( message ){
writeOutput( message & "<br />" );
}
);
// Bind to the "End" event on the thread.
evented.on(
"threadEnded",
function( message ){
writeOutput( message & "<br />" );
}
);
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Halt the page until the thread has finished execution. This
// will cause the events to be triggered BEFORE this page has
// finished running.
thread action = "join";
// Debug threads.
writeDump( cfthread );
</cfscript>
There's a LOT of code here, mostly in the EventEmitter() factory method. But, there's some really exciting stuff happening here if you take the time to go through it! We're making use of meta data; we're comparing function instances through a UUID encapsulated within the event system; and, we're creating a CFThread tag that can emit events! When we run the above code, we get the following output:
The thread has started!
The thread has ended!
As you can see the two callbacks bound to the thread's event emitter were properly invoked through the various aspects of the CFThread's asynchronous lifecycle.
Function Declarations Have Not Changed Nesting Behavior
In all the excitement of being able to nest closures inside of closures inside of closures inside of functions, I started to wonder if the rules about nesting functions may have changed in ColdFusion 10 beta. That is, can one Function Declaration be defined inside another Function Declaration. To test this, I tried throwing a function declaration inside a CFThread tag. Since ColdFusion implements threads as Functions behind the scenes, this will serve as a valid nested test.
<!---
Start a CFScript block. Closures can only be used inside
of CFScript due to the syntax required to define them.
--->
<cfscript>
// Run a thread that composes a function and a closure.
thread
name = "testComposition"
action = "run"
{
// Cannot use a FUNCTION DECLARATION inside of a CFThread
// since CFThread runs as a Function behind the scenes and
// function declarations cannot be nested.
//
// Error: Invalid CFML construct found on line X at column Y.
// The function innerFunction is incorrectly nested inside
// another function definition
// _cffunccfthread_cfthread42ecfm2828400851.
function innerFunction(){ };
// Define an inner closure. We *know* this will work.
var innerClosure = function(){ };
// Define an inner closure that, itself, defines a Function.
var innerComposite = function(){
function innerInnerFunction(){ }
innerInnerFunction();
};
// Try invoking composite closure.
innerComposite();
}
// ------------------------------------------------------ //
// ------------------------------------------------------ //
// Join thread back to page so we can debug the output.
thread action = "join";
// Debug the thread.
writeDump( cfthread );
</cfscript>
Here, I have a CFThread tag that defines a nested Function Declaration, a nested closure, and as a new breed of test, an inner closure which encapsulates its own Function Declaration. If I leave all three tests in, I get a somewhat expected ColdFusion error:
Invalid CFML construct found on line 24 at column 26. The function innerFunction is incorrectly nested inside another function definition _cffunccfthread_cfthread52ecfm2795662261, which is located at line 42, column 9.
If I comment-out the first Function Declaration, however, the rest of the code executes without error. This means that, as always, a Function Declaration cannot be nested directly inside another Function Declaration. We can, however, nest a Function Declaration inside a closure that is nested inside a Function Declaration. Very interesting stuff!
The more I get into the technical aspects of Closures in ColdFusion 10, the more I am loving them! As I wrote some of the code, I did come across some weird bugs that seemed to be related to parsing or variable references (which I'll try to pick apart in a subsequent post). But for the most part, working with closures is very easy and can clearly be quite powerful.
Want to use code from this post? Check out the license.
Reader Comments
This is precisely what I just send you in an "Ask Ben", relating to FoundryCF's Event Emitter CFC and events in general...
I also like the Emitter class syntax you used instead of using a CFC...
In Railo 4 BETA,it would appear I can't use the "thread {}" within a function expression in CFC's...
Not sure what's up there...
Great resource ben, just so happened to be doing a lot of this stuff today. Just fyi though, you can compare equality of objects and functions using .equals(). See: https://gist.github.com/ryanguill/cc083242d3f2700a7767
Ryan,
One quirk I found with .equals() (at least in scripting) is that it must be called on a variable and not chained to a method. Otherwise, you will get an "Invalid CFML construct". See this gist for an example: https://gist.github.com/tristanlee85/7e14f12499170d34246c
Tristan,
I replied on your gist (im not sure if you get notifications for that or not) - this seems to be a compile (syntax parse) error with that code. Certainly strange!
Simplified example: https://gist.github.com/ryanguill/6ee21d821e9af59ecd57
I want to say I experienced this in CF9 as well, but I'm not able to verify that. Silly ACF once again.