Skip to main content
Ben Nadel at the jQuery Conference 2010 (Boston, MA) with: Luke Brookhart
Ben Nadel at the jQuery Conference 2010 (Boston, MA) with: Luke Brookhart

Using A Task CFThread To Run And Restart Daemon CFThreads Indefinitely In Lucee CFML 5.3.6.61

By
Published in Comments (5)

The CFThread tag has been one of the most awesome feature-additions to the ColdFusion language, allowing us to seamlessly and effortlessly run asynchronous code in parallel to the main page request. But, the CFThread has always been a kind of "one off" type of processing. And, for years (dating back to 2010), I've often yearned for a CFThread tag with an interval attribute - something a little more "persistent". It turns out, in Lucee, there is this concept of a Task Thread, which is a version of CFThread that runs outside of the Application. As an experiment, I wanted to see if I could use this Task Thread to "persist the execution" of the traditional Daemon Threads in Lucee CFML 5.3.6.61.

Java Concurrency Features: Lucee CFML / ColdFusion runs on top of Java; and, Java apparently has some robust concurrency libraries built into it. I remember watching Marc Esher give a presentation years ago on a ColdFusion concurrency library that he wrote that leverages the native Java threading. So, there's probably some really intelligent ways to accomplish what I'm about to hack together. But, hacking stuff together is fun!

Ultimately, what I've always wanted is a way to have a persistent background thread that can be spawned within a ColdFusion application, very much in the same way that we can create "workers" in a Node.js application. Since the beginning of time, ColdFusion has had "scheduled tasks", which are a persistent way to make CFHTTP calls back into your ColdFusion application on a regular basis; and, there's no doubt that these scheduled tasks can be used to mimic a background task. However, scheduled tasks are not always the most user-friendly things to configure. As such, the desire for a more "integrated" solution has always felt unfulfilled.

Lucee CFML introduced the idea of a "task" thread. This is spawned via CFThread - just like the traditional "deamon" threads - but uses type="task" instead. And, from what I think I've read, these tasks gets serialized and stored on disk and then executed / managed outside of the ColdFusion application using Lucee's task manager.

To be honest, I learned about "task" threads many months ago; but, I couldn't really figure out what the point of them was. Since they execute outside of the application context in which they were defined, on their face, this kind of made them useless to me. However, I had a moment of inspiration the other day where I thought maybe I could use the task thread to call back into the ColdFusion application, much like scheduled tasks do; only, in a way that was entirely managed by the ColdFusion application code - not the ColdFusion administrator.

To explore this idea, I wanted to create a very simple Producer / Consumer context using Redis as the point-of-indirection. To this end, I'm using the same blocking list-operations that I used months ago, wherein I'll create a consumer that blocks-and-watches a Redis key for values. Then, I'll have a simple producer that pushes messages onto that Redis key.

To do this, I created a super simple Redis / Jedis wrapper that just exposes a callback-based method that manages connections in a connection pool:

component
	output = false
	hint = "I provide a simple, callback-based wrapper for the Jedis connection pool."
	{

	/**
	* I initialize the Jedis connection pool to connect to the given Redis host.
	* 
	* @redisHost I am the Redis host name.
	* @redisPort I am the Redis port being monitored (probably 6379).
	*/
	public void function init(
		required string redisHost,
		required numeric redisPort
		) {

		// In Lucee CFML, we can provide a set of custom JAR files to the createObject()
		// function that will create an isolated Java Class Loader. This is so awesome!
		jedisJarPaths = [
			expandPath( "/vendor/apache/commons-pool2-2.2.jar" ),
			expandPath( "/vendor/jedis/lib/jedis-2.6.0.jar" )
		];

		jedisPoolConfig = javaNew( "redis.clients.jedis.JedisPoolConfig" )
			.init()
		;
		jedisPool = javaNew( "redis.clients.jedis.JedisPool" )
			.init( jedisPoolConfig, redisHost, redisPort )
		;

	}

	// ---
	// PUBLIC METHODS.
	// ---

	/**
	* I get a Jedis connection (to the Redis database) and pass it the given callback.
	* Any data returned from the callback execution is propagated back up to the calling
	* context. The connection to Redis is managed transparently to the callback.
	* 
	* @callback I am the callback function to which the Redis connection is passed.
	*/
	public any function withRedis( required function callback ) {

		var redis = jedisPool.getResource();

		try {

			return( callback( redis ) );

		} finally {

			redis?.close();

		}

	}

	// ---
	// PRIVATE METHODS.
	// ---

	/**
	* I create the Java Class proxy with the given name (using the custom JAR files as
	* the source code).
	* 
	* @className I am the Java Class proxy to create.
	*/
	private any function javaNew( required string className ) {

		return( createObject( "java", className, jedisJarPaths ) );

	}

}

As you can see, this ColdFusion component exposes one method, withRedis(), which takes a callback and then passes a Jedis client to the callback (managing the connection pool behind the scenes). This creates a really simple API that allows the rest of my ColdFusion application to easily interface with the Redis server.

With this Redis Pool wrapper, I then created a simple "producer" which takes form-submission values and pushes them onto the Redis list using rpush:

<cfscript>

	param name="form.message" type="string" default="";

	// If there is a message, let's push it into the Redis list.
	if ( form.message.len() ) {

		application.redisPool.withRedis(
			( redis ) => {

				redis.rpush( "ben.demo", [ form.message ] );

			}
		);

	}

</cfscript>
<cfoutput>

	<!doctype html>
	<html lang="en">
	<head>
		<meta charset="utf-8">
		<link rel="stylesheet" type="text/css" href="./producer.css">
	</head>
	<body>

		<h1>
			Produce a Message
		</h1>

		<form
			method="post"
			action="#encodeForHtmlAttribute( cgi.script_name )#">

			<strong>Message</strong>:
			<input
				type="text"
				name="message"
				size="30"
				autofocus
				autocomplete="off"
			/>
			<button type="submit">
				Send Message
			</button>

		</form>

		<p>
			<a href="./stop.cfm">Stop application</a>
		</p>

	</body>
	</html>

</cfoutput>

As you can see, whenever I submit a message with the form, it pushes the form-input value onto the Redis list located at key, ben.demo.

Now, to create the consumer of this Redis list! The consumer ColdFusion component implements a kind of "Runnable" API that exposes a .run() and .stop() method. The .run() method is expected to run indefinitely, monitoring and processing values popped-off of the ben.demo Redis key. But, the "persistent" nature of this consumer is managed outside of this component, which we'll get to shortly.

Internally, the .run() method is just a while loop that keeps executing indefinitely until it is told to stop.

Note that if the message popped-off of the list is "crash", we're going to throw an exception, thereby "crashing" the Consumer. This is done to showcase the fact that our background processor will "restart" the Consumer as needed.

component
	output = false
	hint = "I am a persistent background consumer that processes messages from a specific Redis list."
	{

	/**
	* I initialize the background Consumer using the given Redis pool. This Consumer
	* exposes a run() method which is intended to run indefinitely in the background of
	* the application.
	* 
	* @redisPool I am the Redis connection pool.
	*/
	public void function init( required any redisPool ) {

		variables.redisPool = arguments.redisPool;

		// I determine if the Consumer should continue processing the Redis list. This
		// flag allows us to halt the background processing after it has begun.
		isRunning = false;

		// This is the Redis KEY that we will be monitoring and consuming.
		redisListKey = "ben.demo";

	}

	// ---
	// PUBLIC METHODS.
	// ---

	/**
	* I kick-off the Consumer processing. This method is intended to run indefinitely
	* (or, at least until the stop() method is called).
	*/
	public void function run() {

		isRunning = true;

		// The point of the Consumer is to run indefinitely, looking for and processing
		// values in the given Redis list. As such, we're going to keep looping until the
		// running-flag is turned-off.
		while ( isRunning ) {

			debug( "Looking for messages in Redis (blocking operation)." );

			// The blpop (Blocking List Pop) command will block the current thread while
			// waiting for a list-item to become available. If it reaches the timeout
			// (which is 5-seconds in this case) without finding a list, a NULL value
			// will be returned.
			var result = redisPool.withRedis(
				( redis ) => {

					return( redis.blpop( 5, [ redisListKey ] ) );

				}
			);

			// No list item was found within the timeout.
			if ( isNull( result ) ) {

				debug( "No messages found after block-timeout." );

			// A list item was found! Woot!
			} else {

				var listName = result[ 1 ];
				var listItemValue = result[ 2 ];

				debug( "Message found:" );
				debug( "" );
				debug( "   #listItemValue#" );
				debug( "" );

				// FOR THE DEMO, to make things a little more interesting, we're going
				// to crash the Consumer thread if the list value was "crash". This will
				// give the background-task an opportunity to restart it and make things
				// a bit more fun :D
				if ( listItemValue == "crash" ) {

					throw( type = "OopsCrash", message = "UnexpectedValue" );

				}

			}

		} // While-loop.

		debug( "Messaging processing stopped." );

	}


	/**
	* I stop the background processing of the Consumer.
	*/
	public void function stop() {

		isRunning = false;

		debug( "Stopping processing of the Redis list." );

	}

	// ---
	// PRIVATE METHODS.
	// ---

	/**
	* I log the given message to the server error-output stream.
	* 
	* @message I am message being logged.
	*/
	private void function debug( required string message ) {

		systemOutput( "[ Consumer ]: #message#", true, true );

	}

}

Ok, so far we have our Redis Pool abstraction, a producer that pushes messages onto a Redis list, and a Consumer.cfc that pops messages off said Redis list and logs them to the server output. Before we dive into how the Consumer execution is being "persisted", let's just get a sense of what we're expecting to see. I'm going to fire up the ColdFusion application, push some messages onto the Redis list, and look at the server output:

A ColdFusion consumer daemon thread being managed by a ColdFusion task thread in Lucee CFML.

It's a bit hard to see in the GIF, but as I'm pushing messages onto the Redis list, they are being popped by the Consumer and logged to the (CommandBox) output (truncated slightly here for readability):

[ Application ]: Bootstrapping Lucee CFML application.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Starting runnable with ID 2f
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]:    hello
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]:    world
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]:    how
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]:    is
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]:    it
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]:    going?
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]:    crash
[ Consumer ]:
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Runnable ended in error:
{"Message":"UnexpectedValue" ..... truncated for blog .....}
[ BackgroundRunner ]: Starting runnable with ID 2f
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]:    woot
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).

As you can see, each message is logged to the server output. And, when entered the message crash, the Consumer threw an error which crashed the CFThread that was managing it (which we haven't seen yet). But, after it crashed, we can see the log messages:

[ BackgroundRunner ]: Runnable ended in error:
{"Message":"UnexpectedValue" ..... truncated for blog .....}
[ BackgroundRunner ]: Starting runnable with ID 2f
[ Consumer ]: Looking for messages in Redis (blocking operation).

Our mysterious "background runner" saw that the Consumer ended in an error and then restarted it, at which point the [Consumer] log messages start back up again.

So, now that we understand what kind of persistent background consumer behavior we're trying to achieve, let's look at how this is being done with a CFThread of type "task". To start, let's look at the ColdFusion Application component, Application.cfc, to get a sense of how this is all wired together:

component
	output = false
	hint = "I setup the application settings and event handlers."
	{

	this.name = "TaskThreadTesting";

	this.directory = getDirectoryFromPath( getCurrentTemplatePath() );
	this.mappings = {
		"/vendor" = "#this.directory#vendor/"
	};

	// ---
	// EVENT-HANDLER METHODS.
	// ---

	/**
	* I get called once when the application is being boot-strapped. This method is
	* naturally single-threaded / synchronized by the ColdFusion runtime.
	*/
	public void function onApplicationStart() {

		debug( "Bootstrapping Lucee CFML application." );

		application.redisPool = new RedisPool( "localhost", 6379 );
		application.consumer = new Consumer( application.redisPool );
		application.backgroundThreadRunner = new BackgroundThreadRunner();

		// The Consumer has a .run() method, which is what the background-runner will
		// call from inside a "persistent" thread. If the thread dies or ends, then the
		// background-runner will call .run() again inside a new thread.
		application.backgroundThreadRunner.runInBackground( application.consumer );

	}


	/**
	* I get called once when the application is being shut-down.
	* 
	* CAUTION: This function only has access to PUBLIC METHODS on this component. As
	* such, we had to make the debug() method PUBLIC in order to use it here.
	* 
	* @applicationScope I am the scope associated with the running application.
	*/
	public void function onApplicationEnd( required struct applicationScope ) {

		debug( "Shutting down Lucee CFML application." );

		// If we're shutting down the application, let's kill the background thread
		// runner or the task will continue to run in the background.
		applicationScope.backgroundThreadRunner?.stop();

	}

	// ---
	// PUBLIC METHODS.
	// ---

	/**
	* I log the given message to the server error-output stream.
	* 
	* @message I am message being logged.
	*/
	public void function debug( required string message ) {

		systemOutput( "[ Application ]: #message#", true, true );

	}

}

As you can see in the onApplicationStart() application life-cycle method, we're creating an instance of our Consumer.cfc and passing it into an instance of BackgroundThreadRunner.cfc. This latter component is going to use the task thread to keep the Consumer.cfc running in the background.

As I stated earlier, these task threads run outside the ColdFusion application in which they are defined. In fact, they appear to be serialized and stored on disk and then deserialized and executed in their own context. In order to make that useful, we're going to have the task thread do little more than call back into our ColdFusion application.

Of course, the task thread doesn't really "know" about our ColdFusion application and has no reference to it. So we need an intermediary point-of-contact. Here enters the Server scope. Everything in ColdFusion can reference the server scope. Which means, we can the server scope as a tunnel between two disconnected ColdFusion contexts.

But, it's not quite that simple. ColdFusion has this concept of a "Page Context", which to be honest, is almost entirely over my head. But, I do know that the "Page Context" is how ColdFusion knows which variables and scopes are available to given piece of executing code.

The "Page Context" for a task thread is entirely different than then "Page Context" of our running ColdFusion application. This means that even if we store a shared variable in the server scope, trying to consume that variable from within our task thread is going to lead to unexpected behaviors.

Luckily, I came across a thread on the Lucee Dev site which discusses the Runnable Java interface and ends-up describing a way to "fix this" disconnected behavior. Essentially, we have to grab the "Application context":

getPageContext().getApplicationContext()

... store that in the server scope, and then use it to attach the task thread to our running ColdFusion application using:

getPageContext().setApplicationContext( applicationContext )

If we run this .setApplicationContext() method inside of our task thread, then we get all of the "expected variable bindings" when we call back into our Consumer.cfc instance. So, to recap, we're going to create a point-of-indirection that looks like this:

Lucee Application <--> server scope <--> Task Thread

And now that we have a sense of the wiring, let's look at our BackgroundThreadRunner.cfc to see how this all fits together. The code in this component is a bit complicated; but, essentially, there is a ensureTaskThread() method which creates the task thread. The task thread then calls back into the BackgroundThreadRunner (via the server scope) using a quasi-private (by convention) method: _ensureRunnablesAreRunning(). This method then iterates over its list of "Runnables" and ensures that they are all running inside their own daemon CFThread wrappers. And, if any of the deamon threads appear to have crashed, they are restarted.

component
	output = false
	hint = "I provide a means to run persistent background threads."
	{

	/**
	* I initialize the background-thread runner.
	* 
	* @sleepDuration I am the duration, in milliseconds, that the Task thread will sleep in between checks to see if the runnables are running.
	*/
	public void function init( numeric sleepDuration = 1000 ) {

		// I determine how long the TASK THREAD will sleep before checking back with the
		// background-runner to make sure that all runnables are running.
		variables.sleepDuration = arguments.sleepDuration;

		// I am the ID that will associate this ColdFusion component instance with the
		// TASK thread via the Server scope.
		taskThreadID = ( "background_task_thread_" & createUniqueId() );

		// Dealing with asynchronous processing requires locking in order to cut down on
		// race-conditions. We'll use this as our base lock ID for exclusive named locks.
		// We'll use two different locks:
		// ---
		// * "#lockID#.task" - This is the high-level lock used to synchronize all access
		//   points into the background task runner.
		// * "#lockID#.entry" - This is the low-level lock used to synchronize all access
		//   points to the individual runnables.
		// ---
		lockID = ( "lock_" & createUniqueId() );

		// I keep track of the Runnable interfaces that this background thread runner is
		// managing. Each Runnable will get its own CFThread wrapper.
		entries = [];

	}

	// ---
	// PUBLIC METHODS.
	// ---

	/**
	* I ensure that each Runnable is actually Running.
	* 
	* CAUTION: This is to be used by the TASK THREAD ONLY! This has to be public, but is
	* not meant for public consumption.
	*/
	public void function _ensureRunnablesAreRunning() {

		lock
			type = "exclusive"
			name = "#lockID#.task"
			timeout = 60
			throwOnTimeout = true
			{

			debug( "Ensuring runnables are running." );

			for ( var entry in entries ) {

				try {

					ensureRunnableIsRunning( entry );

				} catch ( any error ) {

					// If the ensuring operation throws an error, it would be because the
					// underlying CFLock timed-out or because the daemon CFThread could
					// not be spawned.
					debug( "Error ensuring runnable with ID #entry.runnableID#: #serializeJson( error )#." );

				}

			}

		} // END: Task-lock.

	}

	/**
	* I start running the given Runnable in the background using a persistent background
	* thread. All Runnables are given their own daemon CFThread wrapper, all of which are
	* monitored by a single TASK thread.
	* 
	* @runnable I am the Runnable being managed in the background.
	*/
	public void function runInBackground( required any runnable ) {

		lock
			type = "exclusive"
			name = "#lockID#.task"
			timeout = 60
			throwOnTimeout = true
			{

			ensureTaskThread();

			// This runnable will be "activated" during the execution of the task thread.
			entries.append({
				runnable: runnable,
				runnableID: createUniqueId(),
				runnableThread: nullValue()
			});

		} // END: Task-lock.

	}


	/**
	* I stop the background TASK thread and any Runnable instances that it is managing.
	*/
	public void function stop() {

		lock
			type = "exclusive"
			name = "#lockID#.task"
			timeout = 60
			throwOnTimeout = true
			{

			debug( "Stopping background thread runner." );

			// By deleting the server-key, it will tell the TASK thread to STOP CALLING
			// back into this background runner.
			server.delete( taskThreadID );

			for ( var entry in entries ) {

				try {

					ensureRunnableIsStopped( entry );

				} catch ( any error ) {

					debug( "Error stopping runnable with ID #entry.runnableID#: #serializeJson( error )#." );

				}

			}

		} // END: Task-lock.

	}

	// ---
	// PRIVATE METHODS.
	// ---

	/**
	* I log the given message to the server error-output stream.
	* 
	* @message I am message being logged.
	*/
	private void function debug( required string message ) {

		systemOutput( "[ BackgroundRunner ]: #message#", true, true );

	}


	/**
	* I ensure that the given entry is running.
	* 
	* @entry I am the runnable entry being started / monitored.
	*/
	private void function ensureRunnableIsRunning( required struct entry ) {

		lock
			type = "exclusive"
			name = "#lockID#.entry"
			timeout = 60
			throwOnTimeout = true
			{

			// Each runnable entry will receive its own deamon CFThread wrapper. If this
			// thread has not yet been spawned - or, has been terminated / completed - we
			// need to spawn it and track it.
			if (
				isNull( entry.runnableThread ) ||
				( entry.runnableThread.status == "TERMINATED" ) ||
				( entry.runnableThread.status == "COMPLETED" )
				) {

				// If the deamon CFThread wrapper exists and ended in error, let's log
				// the error.
				if ( entry.runnableThread?.keyExists( "error" ) ?: false ) {

					debug( "Runnable ended in error: #serializeJson( entry.runnableThread.error )#" );

				}

				debug( "Starting runnable with ID #entry.runnableID#" );

				// NOTE: Since we may end-up creating the "same thread" over time, we
				// have to make sure it has a server-unique name otherwise Lucee will
				// throw an error about duplicate threads in the same request.
				var threadName = ( taskThreadID & "_" & entry.runnableID & "_" & createUniqueId() );

				// Spawn the deamon CFThread for this Runnable - each runnable runs
				// inside its own asynchronous thread context.
				// --
				// CAUTION: In Lucee CFML, the "entry" attribute here is NOT DEEP-CLONED
				// when it is passed into the CFThread context. This is a deviation from
				// the way Adobe ColdFusion works, which will deep-clone the instance.
				thread
					type = "deamon"
					name = threadName
					entry = entry
					{

					// There's no easy way to terminate a CFThread across different
					// page requests by name (as such the threadTerminate() function
					// won't work). To overcome this, we're going to expose a stop()
					// method ON THE DAEMON THREAD that will turn around and stop the
					// underlying Runnable. This should allow the daemon CFThread to
					// exit naturally.
					thread.stopRunnable = () => {

						entry.runnable.stop();

					};

					// NOTE: We don't need a try/catch around this because we are already
					// going to log an errors that get attached to terminated threads
					// (see top of this IF-statement).
					entry.runnable.run();

				}

				// Store this CFThread instance back into the entry. This way, we can
				// monitor the state of the runnable wrapper as part of the background
				// runner execution.
				entry.runnableThread = cfthread[ threadName ];

			} // END: If.

		} // END: Entry-Lock.

	}


	/**
	* I ensure that the given entry is stopped / stopping.
	* 
	* @entry I am the runnable entry being stopped.
	*/
	private void function ensureRunnableIsStopped( required struct entry ) {

		lock
			type = "exclusive"
			name = "#lockID#.entry"
			timeout = 60
			throwOnTimeout = true
			{

			debug( "Stopping runnable with ID #entry.runnableID#" );

			// We can't actually terminate CFThreads across different requests (at least
			// not that I could find). As such, we can only ask the CFThread to stop its
			// own internal Runnable using the Function that we exposed on the Thread.
			entry.runnableThread?.stopRunnable();

		} // END: Entry-Lock.

	}


	/**
	* I ensure that the TASK THREAD is running in the background.
	*/
	private void function ensureTaskThread() {

		lock
			type = "exclusive"
			name = "#lockID#.task"
			timeout = 60
			throwOnTimeout = true
			{

			// Task Threads are very "sticky" - even if you delete them from the Lucee
			// Admin, they will continue to run in the background. As such, let's make
			// sure we don't currently have one already defined.
			if ( server.keyExists( taskThreadID ) ) {

				return;

			}

			// When we start the TASK THREAD, Lucee will serialize the data associated
			// with the thread and then store it to disk (as far as I can tell). Lucee
			// then deserializes the task data and runs the task in the background based
			// on the retry intervals. As such, the TASK THREAD runs COMPLETELY OUTSIDE
			// THE CURRENT APPLICATION CONTEXT of our main application. To wire these two
			// contexts back together, we're going to use the SERVER SCOPE as the
			// communication tunnel. And, in order to do that, we have to keep track of
			// the APPLICATION CONTEXT so that we can re-attach the TASK THREAD to the
			// current application during its execution.
			// --
			// READ MORE: https://dev.lucee.org/t/using-native-java-threading-in-lucee/471/14
			server[ taskThreadID ] = {
				runner: this,
				applicationContext: getPageContext().getApplicationContext()
			};

		} // END: Task-lock.

		// The TASK THREAD is going to use an internal loop that MOSTLY keeps it going.
		// However, if it crashes, we need to have Lucee try to restart it. That is what
		// the retryInterval is for. Since we want this to be a "persistent" background
		// thread, we want the retries to converge of "infinite".
		var retryInterval = [{
			tries: createObject( "java", "java.lang.Integer" ).MAX_VALUE,
			interval: createTimeSpan( 0, 0, 0, 5 )
		}];

		thread
			type = "task"
			name = "task.#taskThreadID#"
			retryInterval = retryInterval
			taskThreadID = taskThreadID
			sleepDuration = sleepDuration
			{

			// Since the TASK THREAD doesn't have access to the parent VARIABLE scope
			// (remember this is running completely outside the application), we need to
			// give the thread its own version of the debug() method.
			function taskDebug( required string message ) {

				systemOutput( "[ Task Thread ]: #message#", true, true );

			}

			// CAUTION: Errors in a Task Thread seem to get swallowed-up by the ether if
			// they are not explicitly caught and logged!
			try {

				var runner = server[ taskThreadID ].runner;
				var applicationContext = server[ taskThreadID ].applicationContext;

				// The TASK THREAD runs in a completely separate application context /
				// page context. As such, in order to call the following "Runner" in an
				// expected state, we have to re-attach the current TASK application
				// context to the application context in which the Runner was defined.
				getPageContext().setApplicationContext( applicationContext );

				// Since we want to implement persistent background threads, we just
				// want to keep looping here forever. And, if this thread crashes, Lucee
				// will restart it based on the retryInterval.
				while ( true ) {

					// If the server key for this background processor has been deleted,
					// it means the background processing is being shut-down. As such,
					// let's exit out of this Task thread.
					if ( ! server.keyExists( taskThreadID ) ) {

						return;

					}

					try {

						runner._ensureRunnablesAreRunning();

					} catch ( any runnerError ) {

						taskDebug( "Error while ensuring runnables: #serializeJson( runnerError )#" );

					}

					sleep( sleepDuration );

				}

			} catch ( any taskError ) {

				taskDebug( "Error while wiring-up task thread internals: #serializeJson( taskError )#" );

			}

		} // END: CFThread.

	}

}

This code is not simple, so I'm not going to try and describe it - I only hope that I included enough commenting in the ColdFusion component to help explain what is happening. The Task Thread is just sitting there in a while() loop, calling back into ColdFusion application. And, if the task thread crashes for some reason, we have provided a retryInterval that will try Integer.MAX_VALUE times to re-run it. So, theoretically, this won't run forever; but, practically, it will.

A note of caution about errors thrown inside a task thread - they seem to just disappear into the nothingness. As such, it's extremely important that you run your task thread logic inside a try/catch and then explicitly log the errors to the system output (probably using some sort of JSON-based logging structure).

I'm tentatively excited about the Task Thread facet of the CFThread tag in Lucee CFML. Yes, it's not quite what I wanted; and yes, using it as means to interface with a running ColdFusion application is oddly difficult. But, by encapsulating all of the complexity inside a single "background task runner" component, I feel like I've created a decently clean way to make this happen.

I'm sure Brad Wood will come along shortly and tell me how I could have bypassed all of this madness by just using Java; so, I'm happy to see what I can learn from that conversation.

Want to use code from this post? Check out the license.

Reader Comments

15,848 Comments

@All,

So, as I was exploring Task Threads, what I noticed is that the spawned CFThread tag was mysteriously dying at about 30-seconds. And, after poking around, I noticed that the request timeout in the Lucee Server Admin was set to 30-seconds. I can't believe I didn't already know this; but, apparently the CFSetting request timeout affects both the top-level pages and the spawned child-threads:

www.bennadel.com/blog/3870-requesttimeout-setting-affects-cfthread-execution-in-lucee-cfml-5-3-6-61.htm

I feel like I should have learned this a decade ago :( But, I guess late is better than never.

15,848 Comments

@All,

Ok, it seems that setting the requestTimeout within the Task thread will successfully allow the daemon threads to live longer. In a subsequent experiment, I went back and updated my Task thread to look more like this (the important bits have ****** around them):

thread
	type = "task"
	name = "task.#taskThreadID#"
	retryInterval = retryInterval
	taskThreadID = taskThreadID
	sleepDuration = sleepDuration
	{

	// Since the TASK THREAD doesn't have access to the parent VARIABLE scope
	// (remember this is running completely outside the application), we need to
	// give the thread its own version of the debug() method.
	function taskDebug( required string message ) {

		systemOutput( "[ Task Thread ]: #message#", true, true );

	}

	// CAUTION: Errors in a Task Thread seem to get swallowed-up by the ether if
	// they are not explicitly caught and logged!
	try {

		var runner = server[ taskThreadID ].runner;
		var applicationContext = server[ taskThreadID ].applicationContext;

		// The TASK THREAD runs in a completely separate application context /
		// page context. As such, in order to call the following "Runner" in an
		// expected state, we have to re-attach the current TASK application
		// context to the application context in which the Runner was defined.
		getPageContext().setApplicationContext( applicationContext );

		// ********************************************************************
		// ********************************************************************
		// ********************************************************************
		// While the TASK THREAD appears to have no problem running "forever",
		// the DAEMON THREADs that it spawns will adhere to the RequestTimeout
		// settings of the server (which is set to a default of 30-seconds). In
		// order for our daemon threads to live "forever", we have to bump the
		// request-timeout up quite significantly.
		var ONE_MINUTE = 60;
		var ONE_HOUR = ( ONE_MINUTE * 60 );
		var ONE_DAY = ( ONE_HOUR * 24 );
		var ONE_YEAR = ( ONE_DAY * 365 );
		var requestTimeoutInSeconds = ONE_YEAR;

		taskDebug( "Setting request-timeout to #numberFormat( requestTimeoutInSeconds )# seconds." );

		setting
			requestTimeout = requestTimeoutInSeconds
		;
		// ********************************************************************
		// ********************************************************************
		// ********************************************************************

		// Since we want to implement persistent background threads, we just
		// want to keep looping here forever. And, if this thread crashes, Lucee
		// will restart it based on the retryInterval.
		while ( true ) {

			// If the server key for this background processor has been deleted,
			// it means the background processing is being shut-down. As such,
			// let's exit out of this Task thread.
			if ( ! server.keyExists( taskThreadID ) ) {

				return;

			}

			try {

				runner._ensureRunnablesAreRunning();

			} catch ( any runnerError ) {

				taskDebug( "Error while ensuring runnables: #serializeJson( runnerError )#" );

			}

			sleep( sleepDuration );

		}

	} catch ( any taskError ) {

		taskDebug( "Error while wiring-up task thread internals: #serializeJson( taskError )#" );

	}

} // END: CFThread.

Notice that inside the CFThread[ type = "task" ], I'm using the CFSetting tag to set the requestTimeout to one year. Now, when I boot up the application, I get the following output:

[ Application ]: Bootstrapping Lucee CFML application.
[ Task Thread ]: Setting request-timeout to 31,536,000 seconds.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Starting runnable with ID 12
[ Consumer ]: Looking for messages in Redis (blocking operation).
.....

Note the Setting request-timeout to 31,536,000 seconds statement. And, from what I'm seeing in the subsequent logging, none of the daemon threads are timing out.

14 Comments

@Ben,

Did you leave this as an experiment or have you used this approach further?

Looking at ways to keep a "persistent" consumer up and running to process Amazon SQS Messages.

15,848 Comments

@Aaron,

Right now, this R&D is the only time I've actually done this. But, as you're saying, the big use-case in my mind is the persistent consumer. I would love to starting making smarter use of queues and other background tasks; but, I've just always been concerned that things will crash and I don't have the experience to keep them going.

I believe in love. I believe in compassion. I believe in human rights. I believe that we can afford to give more of these gifts to the world around us because it costs us nothing to be decent and kind and understanding. And, I want you to know that when you land on this site, you are accepted for who you are, no matter how you identify, what truths you live, or whatever kind of goofy shit makes you feel alive! Rock on with your bad self!
Ben Nadel