Producing And Consuming Amazon SQS Messages In Lucee CFML 5.3.8.201
A year ago, I shed some light on my quest to merge microservices back into my ColdFusion monolith. For my size team and the type of domain boundaries that we have, a monolith just makes the most sense. And while I've been thrilled with the progress my team has made in its re-consolidation efforts, there's one more microservice that I've had my eye on for a long time. This microservice has eluded my grasp, however, because it interacts with a message queue. And, to date, all parts of the legacy platform that deal with message queues live outside of the monolith. As such, if I ever want to pull this microservice back into the monolith, I have to figure out how to use message queues in ColdFusion. To start with, I wanted to see if I could simply produce and consume Amazon SQS (Simple Queue Service) messages in Lucee CFML 5.3.8.201.
View this code in my Amazon SQS Lucee CFML project on GitHub.
There's whole a lot more to using message queues than sending and receiving messages; taking a look at Enterprise Integration Patterns: Designing, Building, And Deploying Messaging Solutions will show you just how far down the rabbit hole goes. But, I have to start with the basics before I can even begin to worry about the right "integration patterns".
Thankfully, the mechanics of producing and consuming AWS SQS messages are actually not that complex: you have a queue onto which you push and pop messages. "Pushing" and "Popping" aren't really the right terms, per say; but, conceptually speaking, using an Array as a mental model isn't that far off.
It's really the "pop" operation that is significantly different. Instead of popping a message off of a queue and considering the operation complete, the message enters a "pending" state in which it is still in the queue but no longer visible to other consumers. This is the phase in which the consumer is attempting to process the message. And, if the processing is complete, the consumer can make an explicit request to delete the message from the queue. If the processing fails, however, the pending state will timeout and the message will, once again, become visible to other consumers.
Or, it will enter a "dead letter queue" for future investigation. But, that's way beyond the scope of this "hello world" style exploration of Amazon SQS in ColdFusion. To start with the basics, all I want to do in this demo is write messages to a queue via one ColdFusion page and then read those messages off of the queue via another ColdFusion page. That's it.
Just as I did with my recent look at using LaunchDarkly's Java SDK in ColdFusion, the first step I took was to download the Java JAR files from Maven and then build a Java Class Loader using Lucee CFML's ability to load classes off the physical file-system:
component
output = false
hint = "I provide class loading methods for the Amazon Web Services Java SDK."
{
public void function init() {
// NOTE: One of the coolest features of Lucee CFML is the fact that it can create
// Java objects on-the-fly from a given set of JAR files and directories. I mean,
// how awesome is that?! These JAR files were downloaded from Maven:
// --
// https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sqs/1.12.60
variables.jarPaths = [ expandPath( "/vendor" ) ];
}
// ---
// PUBLIC METHODS.
// ---
/**
* I load the given class out of the local AWS Java SDK JAR paths.
*/
public any function load( required string className ) {
return( createObject( "java", className, jarPaths ) );
}
}
This ColdFusion component encapsulates the location of the AWS SDK JAR files; and then provides a load()
method with applies that set of JAR files to a native createObject()
call. This method returns the loaded Java class which I can then initialize, as needed, in the calling context.
Once I had this Java loading abstraction, I could go about creating a simplified wrapper around the low-level SQS Client. The API for the SQS client isn't that large; but, there's always a lot of verbosity when interfacing between ColdFusion and Java. As such, I almost always like to hide - and simplify - that interaction behind a ColdFusion component.
Part of this abstraction includes the winnowing-down of features and flexibility into something that is more easily consumable within my application. This usually means trading-off flexibility for simplicity. In other words, the wrapper that I am creating isn't intended to be a generalized solution. Instead, it's meant to be optimized for my particular uses-cases.
And, for this demo, all I need to do is write messages, read messages, and delete messages. All operations in this demo pertain to a single Amazon SQS queue. As such, I decided to bake the queue name - along with some other settings - into the ColdFusion component state such that I could invoke the CRUD (Create, Read, Update, Delete) methods with as few arguments as possible.
Here's my ColdFusion SQS wrapper for this demo - note that the AwsClassLoader.cfc
is the first argument being passed-in. This way, my SQS client doesn't have to know where those classes are coming from.
component
output = false
hint = "I provide a simplified API over Amazon's Java SDK for Simple Queue Service (SQS)."
{
/**
* I initialize an SQS client for the given queue and credentials.
*/
public void function init(
required any classLoader,
required string accessID,
required string secretKey,
required string region,
required string queueName
numeric defaultWaitTime = 0,
numeric defaultVisibilityTimeout = 20
) {
variables.classLoader = arguments.classLoader;
variables.queueName = arguments.queueName;
// NOTE: Timeouts are provided in Seconds.
variables.defaultWaitTime = arguments.defaultWaitTime;
variables.defaultVisibilityTimeout = arguments.defaultVisibilityTimeout;
var basicCredentials = classLoader
.load( "com.amazonaws.auth.BasicAWSCredentials" )
.init( accessID, secretKey )
;
var credentialsProvider = classLoader
.load( "com.amazonaws.auth.AWSStaticCredentialsProvider" )
.init( basicCredentials )
;
variables.sqsClient = classLoader
.load( "com.amazonaws.services.sqs.AmazonSQSClientBuilder" )
.standard()
.withCredentials( credentialsProvider )
.withRegion( region )
.build()
;
}
// ---
// PUBLIC METHODS.
// ---
/**
* I delete the message with the given receipt handle from the queue.
*/
public void function deleteMessage( required string receiptHandle ) {
sqsClient.deleteMessage( queueName, receiptHandle );
}
/**
* I receive up to the max number of messages from the queue.
*/
public array function receiveMessages(
numeric maxNumberOfMessages = 1,
numeric waitTime = defaultWaitTime,
numeric visibilityTimeout = defaultVisibilityTimeout
) {
var outboundMessageRequest = classLoader
.load( "com.amazonaws.services.sqs.model.ReceiveMessageRequest" )
.init( queueName )
.withMaxNumberOfMessages( maxNumberOfMessages )
// NOTE: Timeouts are provided in Seconds.
.withWaitTimeSeconds( waitTime )
.withVisibilityTimeout( visibilityTimeout )
// For the time-being, we're going to assume that all attributes stored with
// the message are necessary for the processing. As such, we'll always return
// all attributes in our response.
.withMessageAttributeNames([ "All" ])
;
var inboundMessageResponse = sqsClient.receiveMessage( outboundMessageRequest );
var messages = inboundMessageResponse.getMessages().map(
( inboundMessage ) => {
return({
receiptHandle: inboundMessage.getReceiptHandle(),
attributes: decodeMessageAttributes( inboundMessage.getMessageAttributes() ),
body: inboundMessage.getBody()
});
}
);
return( messages );
}
/**
* I send the given message with the optional attributes. Attributes can be binary or
* simple. All simple values are converted to strings. Returns the ID of the message.
*/
public string function sendMessage(
required string message,
struct messageAttributes = {}
) {
var outboundMessageRequest = classLoader
.load( "com.amazonaws.services.sqs.model.SendMessageRequest" )
.init( queueName, message )
.withMessageAttributes( encodeMessageAttributes( messageAttributes ) )
;
var inboundMessageResponse = sqsClient.sendMessage( outboundMessageRequest );
return( inboundMessageResponse.getMessageId() );
}
// ---
// PRIVATE METHODS.
// ---
/**
* I decode the given inbound message attributes into a normal ColdFusion struct.
*/
private struct function decodeMessageAttributes( required struct inboundAttributes ) {
var decodedAttributes = inboundAttributes.map(
( key, value ) => {
if ( value.getDataType() == "Binary" ) {
return( value.getBinaryValue() );
} else {
return( value.getStringValue() );
}
}
);
return( decodedAttributes );
}
/**
* I encode the given outbound message attributes for use in the Amazon SDK.
*/
private struct function encodeMessageAttributes( required struct outboundAttributes ) {
var encodedAttributes = outboundAttributes.map(
( key, value ) => {
var encodedValue = classLoader
.load( "com.amazonaws.services.sqs.model.MessageAttributeValue" )
.init()
;
if ( isBinary( value ) ) {
encodedValue.setDataType( "Binary" );
encodedValue.setBinaryValue( value );
} else {
encodedValue.setDataType( "String" );
encodedValue.setStringValue( javaCast( "string", value ) );
}
return( encodedValue );
}
);
return( encodedAttributes );
}
}
In the end, the hardest thing to figure out was the encoding and decoding of SQS message attributes. I wanted my calling context to be able to supply attributes as a simple ColdFusion Struct; but, this ColdFusion Struct then had to be translated into a series of MessageAttributeValue
Java class instances that were populated using type-specific methods. Ironically, it took me about 20-minutes to get the basic CRUD operations working; and then, an entire hour to figure out how the heck to read and write attributes.
ASIDE: Part of why it took me so long to figure out how to work with the attributes is because I didn't realize that there were "system attributes" and "message attributes". As such, I kept trying to read the "system attributes" and not understanding why my "message attributes" weren't being returned.
Like I said above, there's a good deal more complexity when it comes to building robust message-based solutions. But, I'm not worried about that for this post. So, the API for my client is relatively simple.
In fact, writing messages to the queue is dead simple with this client. All I have to do is provide a String for my message. In this case, I'm going to use said String to define a JSON (JavaScript Object Notation) payload. And, to get that message into the queue, I'm going to use an HTML <form>
element with a single input:
<cfscript>
param name="form.messageText" type="string" default="";
if ( form.messageText.len() ) {
application.sqsClient.sendMessage(
serializeJson({
text: messageText
}),
{
version: 1,
createdAt: getTickCount()
}
);
}
</cfscript>
<cfoutput>
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<title>
Producing Amazon SQS Messages In Lucee CFML 5.3.8.201
</title>
<style type="text/css">
input, button { font-size: 100% ; }
</style>
</head>
<body>
<h1>
Producing Amazon SQS Messages In Lucee CFML 5.3.8.201
</h1>
<form method="post" action="./produce.cfm">
<input
type="text"
name="messageText"
size="40"
autocomplete="off"
/>
<button type="submit">
Send Message
</button>
</form>
</body>
</html>
</cfoutput>
To keep things a bit more interesting - and so that I had an excuse to figure out how message attributes work - I'm passing a second argument to the .sendMessage()
method. The second struct defines the message attributes which can contain both String and Binary values.
Reading messages off of a queue is a little more quirky. Since our ColdFusion application doesn't inherently know when a new message is available on the queue, it has to continually ask the queue if new messages need to be processed. This can lead to a lot of HTTP activity which, at least with Amazon Web Services (AWS), has a dollars-and-cents cost associated with it.
To reduce the cost of queue interactions, SQS provides "long polling". Long polling allows ColdFusion to hold an HTTP request open for up to 20-seconds while waiting for new messages to arrive. Since "long polling" and "short polling" have the same cost, this technique reduces the total number of times that ColdFusion will have to ping the Amazon SQS queue.
NOTE: Amazon recommends that "long polling" be used as often as possible. Not only does this reduce cost but, it also decreases the latency of message processing. As a related recommendation, Amazon also suggests that each unique queue be processed in its own thread. This way, the "long polling" of one queue does not negatively impact the message processing of another queue. This is part of why I chose to bake the queue-name into my
SqsClient.cfc
component state - I figured that I would end up instantiating one ColdFusion component per queue that needed consuming.
For this demo, my "consumer" ColdFusion template is going to keep an in-memory cache of messages that it dumps-out to the page. Then, it has the option to start "long polling" the Amazon SQS queue looking for more messages. Whenever it receives a result, it will update the in-memory cache, dump it out the screen (for us to see), and then automatically refresh the page and start long polling again:
<cfscript>
param name="url.poll" type="boolean" default="false";
if ( url.poll ) {
// With a waitTime of 20-seconds, this request is going to BLOCK-AND-WAIT for
// a successful message response for up-to 20-seconds. The response may come back
// as an empty array; or, it may contains a max of "maxNumberOfMessages" items.
messages = application.sqsClient.receiveMessages(
maxNumberOfMessages = 3,
waitTime = 20,
visibilityTimeout = 60
);
for ( message in messages ) {
application.messages.prepend( message );
// Now that we've stored the message locally, delete it from the queue.
application.sqsClient.deleteMessage( message.receiptHandle );
}
} else {
application.messages = [];
}
</cfscript>
<cfoutput>
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<title>
Consuming Amazon SQS Messages In Lucee CFML 5.3.8.201
</title>
<!---
For the purposes of this demo, while polling for new messages on the queue,
we need to keep refreshing the page in order to engage the long-polling of
the Amazon SDK Java client.
--->
<cfif url.poll>
<meta http-equiv="refresh" content="2" />
</cfif>
</head>
<body>
<h1>
Consuming Amazon SQS Messages In Lucee CFML 5.3.8.201
</h1>
<cfif url.poll>
<p>
<a href="./consume.cfm?poll=false">
Stop polling
</a>
</p>
<!---
NOTE: I'm hiding the receiptHandle in the output since it is quite long
(a max of 1,024 characters) and breaks the layout of the demo page.
--->
<cfdump
label="Messages"
var="#application.messages#"
hide="receiptHandle"
/>
<cfelse>
<p>
<a href="./consume.cfm?poll=true">
Poll for messages
</a>
</p>
</cfif>
</body>
</html>
</cfoutput>
As I discussed earlier, once we block-and-wait for new messages, those messages are put into a "pending" state while we process them in our application. If we fail to process them within the visibilityTimeout
window (in seconds), Amazon SQS will put them "back" into the queue for other consumers to receive. As such, we have to explicitly delete the messages after we've received them and pushed them into our in-memory cache.
NOTE: For the sake of simplicity, I'm deleting the messages individually. However, as a cost-savings and performance enhancement, Amazon SQS provides a bulk delete API that could have been used in this case.
If we run this ColdFusion application and start polling for new messages, we can see that messages written by our "producer" are almost immediately made visible to our "consumer":
Now, there's actually some very interesting things to notice in the GIF above. Before I start long polling the Amazon SQS queue, I pre-populate it with the messages:
"a", "b", "c", and "d"
However, if you look closely at what happens after we start polling, you'll see that:
- "c" shows up on the first response.
- "d" shows up on the second response.
- "a" and "b" show up on the third response.
Here, you start to get a taste of how the Amazon SQS architecture affects the way in which messages are received (at least with a "standard queue", not a "FIFO queue"). When you write a message to a queue, Amazon both duplicates the message across a number of nodes and distributes messages across a number of nodes. This is how they provide such high service availability and message durability. But, this means that when you look for new messages, what you get back depends on which nodes were searched. Amazon doesn't search all nodes on each request; which means, the order of messages is not guaranteed; and, an empty result doesn't necessarily mean that no messages are available.
Other than that, however, you can see that when we write new messages to the Amazon SQS queue, they become - almost instantaneously - available to the long poll consumer.
The low-level mechanics of interacting with an Amazon SQS queue in ColdFusion aren't overly complex; at least not when we use Amazon's Java SDK under the hood. It's going to be the overall queue life-cycle that adds the complexity. But that will be fodder for follow-up blog posts. In the meantime, I think I may go back and re-read RabbitMQ: Patterns For Applications by Derick Bailey - it's significantly shorter than Enterprise Integration Patterns and will help me get refreshed on some of the best practices regarding queue integration.
Want to use code from this post? Check out the license.
Reader Comments
Always great stuff that you offer, Ben. Thanks.
It seems worth mentioning (for the sake of readers here) that Cf2021 specifically added builtin support for aws sqs. I realize it won't interest those committed to Lucee, and certainly what you've shared can help also those on cf2018 or earlier. But since much of this work (and more down that rabbit hole) is "handled" by Cf2021, it seems at least notable.
The docs for it are substantial, and folks interested at all in the topic might find it and the feature set interesting:
https://helpx.adobe.com/coldfusion/using/integrate-coldfusion-amazon-sqs.html
Of course, there will always be something missing or not working quite right for some use cade. It is the first release of the feature, after all.
And FWIW, there is similar support for Azure's msg queuing feature, Service Bus. And indeed there are specific new features for other cluud services in both AWS and Azure, continuing the theme of making hard things easy. Like you, many are exploring the options to split their monoliths into microservices.
Whichever patg one takes, it's a testament to CF and to Lucee that CF folks can "get there", integrating their cfml.
PS About your implementation, it may help some readers also to hear that while ACF doesn't yet support loading a specific jar in createobject/cf object like Lucee does, it has long supported loading them at the application level--rather than just the instance/admin level, so at least configurable and changeable in code.
@Charlie,
Excellent commentary. I actually looked through the page you documented while I was working on my implementation - looking for ideas and for things that I missed (especially since I have so little hands-on experience with message queues in general). I'm excited that ACF 2021 has added this stuff.
But, one thing that I'm still not seeing in their documentation is suggestions on how to actually monitor the queue throughout the application life-cycle. Meaning, once you create your queues and configure them, how do you actually go about having a persistent thread / process that polls the queue. Do you know if they have that documented or outlined anywhere?
The low-level effort approach that I am considering is just have a scheduled-task that runs every minute and has, internally, a
while(true)
loop or something (with some sort of limit). This scheduled task would, essentially, just be making sure that the background thread is always running.Another thing I was considering is playing with Lucee's "Task Threads", which run in the background, but have some funky constraints.
I think this is the part that will be the most challenging to "get right". I'll see where this exploration takes me.
Thanks for the reply, Ben. As for the question of polling, that's covered briefly at the end of the docs page (which is 44 pages, if printed), along with an automated dead letter queue in the section above that.
I've not used either feature, and I won't be surprised if it's lacking in some respect. If so, that's where you could send an email to cfinstal@adobe.com. While that's ostensibly for free install support, it seems they're OK answering questions about use of CF also, especially something that may not be well-documented. They've even been known to acknowledge a problem and pass it to engineers for more.
@Charlie,
Ah, very cool - I'll look more closely at the end of the docs, thanks!
@Ben,
Typically you would not want your application polling a queue in a bit while loop instead use ( for example ) lambda to be invoked upon a message arrival in SQS. Alternatively use a cron job to poll a queue rather than having a long running thread in cfml. Lucee runs in lambda ( as does cfml2021 well it did in Btea and they promised it again at the recent Adobe talks ).
Paul, FWIW, the Lambda support in ACF is returned in update 2 of CF2021, now in prerelease at prerelease.adobe.com. And it HAD been in the final release (in November 2020) but was curiously pulled shortly after. I have blog posts on both points ( its being pulled and that prerelease of update 2 ), for any interested.
Ah, I see we don't have to use the markdown formatting for links (which is nice). But it doesn't seem we can edit comments. I'll try to remember not to bother with them going forward. (Or Ben, if you might want to edit my last one and delete this one, feel free.)
@Charlie,
Sorry about that. It looks like there was a space between the
]
and the(
characters, so the markdown parser wasn't seeing it right. I have edited it.@Paul,
I have only slightly more experience with Lambda than I do with SQS, which is to say, very little for either 😖 That said, I am not sure that there is anything inherently wrong with having a long running thread in ColdFusion. I'm actually just writing another post wherein I use a scheduled talk to help keep a "background thread" running.
If there are some obvious concerns, please share them so that I may better understand the pitfalls!
@All,
After looking at the low-level mechanics of writing to / reading from an AWS SQS queue in Lucee CFML, I wanted to step back and think about a more holistic integration that includes the long-polling. In my follow-up post, I am doing this using a scheduled task:
www.bennadel.com/blog/4111-separation-of-concerns-when-consuming-amazon-sqs-queues-in-lucee-cfml-5-3-8-201.htm
But, essentially I create an architecture that has three layers of responsibility:
I'm learning as I go here, but it feels like these parts are nicely decoupled.
@Ben, Ah, OK. Thanks. :-)
@Ben, Nothing wrong inherently wrong with it but queues are designed to decouple for processing "later" it's a mechanism to help scale tasks that can be run asynchronously. Running a loop takes CPU/resource and is also limited to that one machine( that the loop is run on you could run the loop on > 1 machine though ). If you think of a situation where you want an optimal result you would drop a message on a queue and only then ( at the other end ) would some type of compute power spin up to process that request.
Consider the following, saving html to a database is quick and efficient, howvere converting that to a PDF is not. When you hit save a message is sent to the queue, the main app continues to process user requests without loading the CPU. The message hits lambda ( or another other mechanism that's listening/polling ) the queue and generates the PDF. You have de-coupled and now can scale at both sides independently. Polling from ec2/containers replies on that compute being pre-provisioned leading to wasted CPU, lambda only start up when a request comes in.
@Paul,
These are all good points - thank you for clarity on what you were intending. Though, I don't want to over-state the CPU / resource overhead of running a loop that is perhaps mostly blocking-and-waiting. After all, any given ColdFusion server is already processing dozens / hundreds of requests-per-second; so, having one request that is longer-living is likely not an issue in-and-of itself.
But, you are completely correct in that the main value-add of the queue is to decouple systems. Ironically, my motivation for learning about queues is to re-consolidate two different systems wherein one of the systems is already using a queue internally. So, for this particular case, I need to implement a queue for reasons that aren't necessarily aligned with best practices.
In theory, I love the idea of using Lambda to process a queue. But I don't have much real-world experience with it.
@All,
Continuing on my exploration of Amazon SQS, I wanted to start thinking about request tracing. That is, how to associate the pre-queue portion of a workflow with the post-queue portion of a workflow for the purposes of logging and debugging:
www.bennadel.com/blog/4118-request-tracing-propagation-when-consuming-amazon-sqs-queues-in-lucee-cfml-5-3-8-201.htm
Part of my approach leans heavily on the globally-available
request
scope; and, my ability to override arequest.requestId
property just before I process a message. This can get messy, though, since I'm processing (potentially) many messages per long-polling request.This speaks a bit to @Paul's suggestion to use Lambda, where I wouldn't have to worry about one Lambda invocation processing more than a single message.
Hi Ben, My comment about Idle CPU is reagaring complete de-coupling. We should when building systems avoid where possible having server sit there polling. This is what I mean by overhead CPU. Sure I get it in the old days part of the machines CPU sits there doing nothing and that can be used to for example poll a queue but it's not how we should be thinking going forward. On your "web servers" keep the incoming requests nice and hot and focused on exactly that "request and response", any long running process or process that can be run async off-load to a push based compute platform that starts only when there is work to do and otherwise is not just idelt but completely off.
@Paul,
100%, I completely get what you are saying. And, in an ideal world, I think you are correct. But, in my particular context, we have a tiny team responsible for a lot of services. So, even something like a simple Lambda function can represent an unpleasant operational burden when something like:
The node.js version is no longer supported and it has to be updated (sometimes in an emergency, like AWS just won't spin up a new instance anymore).
A vendor library has a vulnerability and needs to be upgraded, which some times has cascading upgrade requirements.
Obviously, this stuff isn't happening that often. But, when it does, we have so few people that it means we have to completely drop what we're doing and address the problem.
I'm not trying to be a contrarian. I honestly wish I was more comfortable with AWS Lambda and that it didn't actually pose a "burden" on the team. I'm only saying that there is always a gap between the ideal operations and the realistic dumpster fire that we sometimes find ourselves in 🤣
I know this is a pretty old post, but just ran into an issue after upgrading to Lucee 6 and figured I should post it here for anyone that is using the code/solution described above.
After the upgrade to Lucee 6.0.0.585, I started getting the following errors when reading from an SQS queue:
The error was happening whenever I called
sqsClient.receiveMessage()
(see functionreceiveMessages
on the sample code above.After spending a good amount of time trying to figure it out, including updating the
aws-java-sdk*
jars to their latest version, the only thing that seemed to work was replacing the QUEUE_NAME for the actual full url of the queue.Not sure why it worked before, but apparently something related to the upgrade to Lucee 6 seems to now only want the url instead of the name.
@Oscar,
So good to hear from you! It's been a long time :D Thank you for sharing the error and the solution. I am sure that will help people who google for this 💪
I was just thinking about you the other day. I was trying to update a Lucee
Dockerfile
, and theapt-get
install is giving me a version ofrdiff
that is breaking the asset versioning you built so many years ago. Oh man, the whole asset management stuff was so crazy - let's make a mental note to never again build a system that stores and versions gigabytes of files for people.