Event-Driven Systems w/ RabbitMQ

Michael Sanni | Backend Engineering
January 25, 2023

tl;dr (AI-generated by GPT-3.5 🦾)

RabbitMQ is a powerful open-source message broker that can help you build event-driven systems, allowing components to send and receive messages reliably and asynchronously. With the right setup and configuration, you can monitor and troubleshoot issues quickly, ensuring your system remains reliable and responsive.

Introduction

Event-driven systems are a type of architecture that allows different system components to communicate through exchanging messages or events. These events can be triggered by various sources, such as user interactions, sensor readings, or external APIs, and are typically processed asynchronously to ensure the system remains responsive and scalable.

RabbitMQ is an open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). It acts as a broker between the different components of an event-driven system, allowing them to send and receive messages reliably and asynchronously. It supports many messaging patterns, such as publish/subscribe, point-to-point, and request/response – making it a versatile tool for building event-driven systems.

How we use RabbitMQ at Spoke

At Spoke, we run a micro-service architecture. This has allowed us to build and deploy services as self contained entities. Our services are written in Nodejs (Typescript + Fastify) and Python (FastApi) and we usually need a way to communicate between the services without any form of tight coupling.

For example, for parts of our Slack Thread Summarization process, we use Open AI API + RabbitMQ + Slack API Socket Mode. This is just one of the many use cases we use messaging queues for. This has helped us to achieve better performance by having multiple services subscribing and processing messages without blocking other async operations. You can check out how we summarize slack threads in seconds here.

Installing RabbitMQ

You can set up RabbitMQ using a managed service – such Amazon Managed Queues from AWS or CloudAMQP – or by setting up a docker container using the RabbitMQ image on the docker hub. You can find detailed information for downloading and installing here.

Definition of Terms

There are a couple of terminologies you need to be familiar with to be able to understand and work effectively with the implementation:

  • A Producer is a component that sends messages to an exchange.
  • A Consumer is a component that receives messages from a queue.
  • An Exchange is a message routing hub in RabbitMQ. It receives messages from producers and routes them to queues based on the rules defined by bindings. Exchanges can use different routing algorithms such as direct, topic, fanout, and headers.
  • Queues are where messages are stored in RabbitMQ. A queue can be bound to one or more exchanges, and the messages sent to the exchange are then routed to the queues based on the bindings defined.
  • Channels are virtual connections within a physical connection. They are used to multiplex a connection into several channels, allowing for greater flexibility and ease of use.
  • Routing Key is a key used to route a message from a producer to an exchange. It is combined with the exchange type and bindings to determine which queues a message will be delivered to.
  • Binding Key is a routing key used to bind a queue to an exchange. It is used to specify the criteria that a message must match to be routed to a specific queue.
  • Bindings are the rules that determine how messages flow between exchanges and queues in RabbitMQ. They are defined by a binding key, which is used to match messages to queues, and a routing key, which is used to route messages to the appropriate exchange.
In summary, the Producer sends messages to the Exchange, the Exchange routes the messages to Queues based on Bindings and a Routing Key, the Consumer receives messages from the Queues. Channels are used to multiplex a connection into several channels, which allows for greater flexibility and ease of use.

Connection and Setup (high-level)

To connect to RabbitMQ, you need a connection URL from docker or a managed service from cloud providers:

// a sample connection url obtained from <https://www.cloudamqp.com/> AMQP_URL=amqps://elhvkrhd:80y5aiqQ2rq2pfR0lujpY3Z_TBBNqx6F@moose.rmq.cloudamqp.com/elhvkrhd

Next, you can define an interface that specifies how to connect with RabbitMQ with several options. The interface below allows you to specify the connectionUrl, heartbeatIntervalInSeconds and reconnectTimeInSeconds:

export interface ISettings { connectionUrl: string; heartbeatIntervalInSeconds?: number; // Interval to send heartbeats to broker. Defaults to 5 seconds. reconnectTimeInSeconds?: number; // The time to wait before trying to reconnect. If not specified, defaults to heartbeatIntervalInSeconds. }

As a next step, specify the consumer definitions, how to subscribe to messages on a particular queue through an exchange, and how to handle the message when it is received:

export interface IConsumerDefinition { channelName: string; messageHandlerDefinitions: IMessageHandlerDefinition[]; } export interface IMessageHandlerDefinition { exchangeName: string // exchange name for lookup in exchange information queueName: string // human readable name for the queue exclusive: boolean // is the queue exclusive to one (and only one) consumer messageHandler: MessageHandlerFunction // function that actually handles the message bindingKey?: string // optional binding key that will be used to filter messages prefetchCount?: number // This option makes it possible to limit the number of unacknowledged messages on a channel }

Depending on the kind of operation you plan to perform on the service, you can either start a producer or a consumer:

startProducer({ connectionUrl: process.env.AMQP_URL });

The startProducer starts an instance of the producer on the service. This will ensure you are able to publish message to the queue from the service.

startConsumer(config.amqp.settings, config.amqp.consumerDefinitions);

If you only plan to consume from the service, you can start a consumer. The settings and consumer definitions interface are declared above. If you pay attention to the consumer definition handler, you will notice it's an array, which means there can be multiple subscribers listening on a particular exchange and handling the messages respectively.

Publishing to the Exchange – Producer

Messages are not published to the queue directly, they go through an exchange. The messages in the exchange are bound to the queue with bindings. There are different types of exchange , Direct , Topic , Fanout Headers etc. You can read more about RabbitMQ exchanges here. This article uses Direct Exchange as it allows publishing to specific exchange using a unique routingKey . For the consumer to be able to get this message, it needs to subscribe to the queue by binding to exchange using a bindingKey that is equal to the routingKey .

To publish message to the queue, get an instance of the running producer using an exchange name and publish to the particular exchange:

const producer = AMQPProducerManager.instance.get(EXCHANGE_NAME); await producer.publish(message, routingKey);

The routing key determines how the message will be routed. Any consumer that listens on the same exchange with a binding key equals to the routing key will get the message.

Subscribing to Messages from the Queue – Consumer

A consumer is bound to the queue with bindingKey. In a direct exchange, this bindingKey must have the same value as the routingKey from the producer.

To subscribe to the queue via an exchange pass the AQMP_URL and the consumer info:

amqp: { settings: { connectionUrl: process.env.AMQP_URL, }, consumerDefinitions: [ { channelName: “data_api”, messageHandlerDefinitions: [ subscribeToUpdateWorkflow(“update_workflow”, handleUpdateWorkflow, false), // handler 1 subscribeToOnDemandSummary(“on_demand”, handleOnDemandSummary, true) // handler 2 ], }, ], },

You can have several handlers, each handler listens on a particular exchange and handles the operation once the messages are available. The messageHandlerDefinitions defines the how to subscribe to different exchange and queue and the different ways to handle the messages once they are received, The last boolean option is to make the queue exclusive or not. Exclusive queue means that, the message can only be consumed by one consumer.

Then specify how the messages will be handled when they are received:

export const handleUpdateWorkflow = async (message: any) => { console.log(“from update workflow exchange and queue”); const messageInfo = message.content.toString(); console.log(messageInfo) }; export const handleOnDemandSummary = async (message: any) => { console.log(“from update on demand exchange and queue”); const messageInfo = message.content.toString(); console.log(messageInfo) }; // These handlers can be in different files for more readability

Error handling and recovery

When building an event-driven system with RabbitMQ, it is important to consider how errors will be handled and how the system will recover from them. This can include implementing retry logic for failed messages, monitoring the system for errors, and having a plan in place for dealing with unexpected events such as broker failures. You can also implement a Dead Letter Queue for keeping messages that failed to process which can then be reprocessed over time

Monitoring and troubleshooting

Monitoring the state of the message broker and its components, such as exchanges, queues, and bindings, can help identify issues early on and prevent them from becoming major problems. Additionally, having the ability to troubleshoot and diagnose issues quickly can help minimise downtime and ensure the system remains reliable. You can read more about monitoring here.

Conclusion

In this article, we discussed how RabbitMQ can be used to build event-driven systems, including setting up and configuring the message broker, sending and receiving messages, advanced features such as routing and message filtering using routing and binding keys, and best practices for performance, error handling, and monitoring.

If you would like to chat about event driven systems or how to best adapt RabbitMQ for your use case, always feel free to reach out to me via LinkedIn with any questions! 🤝

View all Blog Posts

Get early access to Spoke

Communicate better, build faster ⚡️

Early Access