Split messages from single SQS queue into Multiple SQS queues using EventBridge
Table of Contents
In this post I discuss how messages in a SQS queue can be split between multiple SQS queues with their original payloads. The idea is to achieve this functionality with a low code solution using EventBridge Pipe, Event Bus and Event Rules.
Amazon EventBridge Pipes #
Amazon EventBridge Pipes was introduced in the last re:Invent 2022. EB Pipes helps to create point to point integration with event producers and consumers with low code.
This was a great addition to the Serverless services since this will reduce some custom codes that are required to connect two services together. Also it includes additional capabilities to filter and enrich events as they are passed through the Pipe. As of now, EB Pipe supports streaming sources such as Kinesis, DynamoDB, Self Managed Apache Kafka, Amazon MSK and Amazon MQ and SQS as well.
About this project #
Imagine a situation where you need to split messages in a SQS queue based on it’s content to separate SQS queues. An example can be where an external producer sends messages to a single SQS queue and you have to process those messages using different consumers. Else, those messages may need to be processed with different priorities. So, first, you need to separate those messages into different SQS queues.
This can be achieved using a single Lambda function where Lambda function will first process the messages from the source queue and then send them to different SQS queues.
However, this Lambda function will contain too much business logic and permissions to point correct messages to the respective target queues. Also, it should perform in a way to scale well based on the demand and be reliable to not to be a single point of failure. Further, when there are more targets introduced, this needs to be extended to cater those requirements, which is always a challenge.
Solution #
How it works #
-
EventBridge Pipe is configured to poll messages from the source SQS queue.
-
There is no filter set up, which means all the messages in the source queue will be processed through the Pipe.
-
When a message is received from the Pipe, it contains not only the original message in the body parameter, but a lot of metadata (related to SQS) as well. Because of that, in order to send the message to the targets, it is required to extract only the original message from the payload. For this, the enrichment Lambda function is used here.
-
Then, this message is sent to the Event Bus.
-
There are event rules defined for the Event Bus with conditions that will match against this original message.
-
Each rule has a SQS queue defined as the target.
-
In a scenario where a message content is matched with the rule, that message will be sent to the particular queue.
Try it yourself #
I have created a sample application to test this scenario. This is created using AWS CDK v2 with Python. So, you need CDK v2 and Python installed in your environment.
How to set up #
-
Clone the repository: https://github.com/pubudusj/sqs-to-multiple-sqs
-
Go into the cloned directory.
-
To manually create a virtualenv on MacOS and Linux:
$ python3 -m venv .venv
-
After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.
$ source .venv/bin/activate
-
If you are a Windows platform, you would activate the virtualenv using:
% .venv\Scripts\activate.bat
-
Once the virtualenv is activated, you can install the required dependencies.
$ pip install -r requirements.txt
-
Then, deploy the application:
$ cdk deploy
-
Once the application is deployed, in the output you can see 4 values for:
SourceQueueUrl
,TargetQueueOrderCreated
,TargetQueueOrderUpdated
andPipeArn
. Copy the value ofSourceQueueUrl
to test the application.
Test #
-
Here, I have configured 2 Event Bus rules.
-
First rule will match a message with the field
type
which has the value ‘OrderCreated’. This rule has a target SQS queue namedTargetQueueOrderCreated
. -
Second rule will match a message with the field
type
which has value ‘OrderUpdated’. This rule has a target SQS queue namedTargetQueueOrderUpdated
. -
Send the a message with type ‘OrderCreated’ into the source queue as follows:
aws sqs send-message \ --queue-url=SourceQueueUrl \ --message-body '{"orderId":"125a2e1e-d420-482e-8008-5a606f4b2076", "customerId": "a48516db-66aa-4dbc-bb66-a7f058c5ec24", "type": "OrderCreated"}'
-
If you check the
TargetQueueOrderCreated
, you will see the message has arrived into the queue with the original payload. -
Send the a message with type ‘OrderUpdated’ into the source queue as follows:
aws sqs send-message \ --queue-url=SourceQueueUrl \ --message-body '{"orderId":"125a2e1e-d420-482e-8008-5a606f4b2076", "customerId": "a48516db-66aa-4dbc-bb66-a7f058c5ec24", "type": "OrderUpdated"}'
-
If you check the
TargetQueueOrderUpdated
, you will see the message has arrived into the queue with the original payload.
Conclusion #
With EventBridge Pipes, it is easy to connect sources, specially streaming sources, with the targets with minimum configurations, where previously custom code is required. It scales well, cost effective and very minimum maintenance is required.
Useful Links #
-
EventBridge Pipes documentation: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html
-
Cloudformation API for Pipes: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html
-
Pipes Documentation for CDK v2 Python: https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_pipes/CfnPipe.html
-
EventBridge Pipes pricing: https://aws.amazon.com/eventbridge/pricing/#Pipes
Please feel free to deploy this solution to your own AWS environment and share your experience with me.
Keep building! Keep sharing!