A Guide to RabbitMQ Java Client

In this post, I will cover the common uses of the RabbitMQ Java client.

Dependencies

Before using the RabbitMQ Java client, you need to add the RabbitMQ Java client dependencies to your project:

// Gradle (Kotlin DSL)
implementation("com.rabbitmq:amqp-client:${latest_rabbitmq_version}")

or

<!-- Maven -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${latest_rabbitmq_version}</version>
</dependency>

Running RabbitMQ with Docker

docker run -d --name rabbitmq1 -p 5672:5672 -p 15672:15672 rabbitmq:4-management

Establishing a connection to RabbitMQ

An Advanced Message Queuing Protocol (AMQP) connection is a link between the client and the broker that performs underlying networking tasks, including initial authentication, IP resolution, and networking.

Each AMQP connection maintains a set of underlying channels. A channel reuses a connection, forgoing the need to reauthorize and open a new TCP stream, making it more resource-efficient.

Unlike creating channels, creating connections is a costly operation, very much like it is with database connections. A single AMQP connection can be used by many threads through many multiplexed channels. The handshake process for an AMQP connection requires at least seven TCP packets, and even more when using TLS. Channels can be opened and closed more frequently if needed.

The following is an example of establishing a RabbitMQ connection using the Java client:

ConnectionFactory factory = new ConnectionFactory();
// Note: Use environment variables for configurations. Don't hardcode credentials. This code is just for demonstration.
// "guest"/"guest" by default, limited to localhost connections
String userName = "guest";
String password = "guest";
String virtualHost = "/";
String hostName = "localhost";
int portNumber = 5672;
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

// Reuse connection: Create a connection once per thread or once per producer/consumer and reuse it for sending or receiving messages.
Connection conn = factory.newConnection();
log.info("Connection: {}", conn);

// Reuse channel: Create a channel once per thread or once per producer/consumer and reuse it for sending or receiving messages.
Channel channel = connection.createChannel();

Publishing messages

RabbitMQ supports four main types of exchanges, each with different routing logic to determine how messages are delivered to queues. You send messages to an exchange, which uses the routing key and bindings to determine which queue(s) to deliver the message to.

The four different types of exchanges:

  • Direct: Routes messages to queues with an exact match between the routing key and the queue’s binding key. Delivers messages to a single queue.
  • Topic: Routes messages to queues based on pattern-matching between the routing key and binding key, using * and # wildcards (*: matches exactly one word, #: matches zero or more words). Delivers messages to multiple queues based on pattern-matching routing keys.
  • Fanout: Messages are routed to all queues bound to the fanout exchange. Ignores routing keys.
  • Headers: Use the message header attributes for routing. Ignores routing keys.

Using a direct exchange to route messages to a single queue

// Create a connection and a channel
Connection connection = MyHelper.getConnection();
Channel channel = connection.createChannel();

// Declare the exchange on every message sent.
String exchangeName = "my_direct_exchange";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);

// Declare the queue on every message sent. It will not do anything if the queue already exists.
String queueName = "queue.1";
String queue = channel.queueDeclare(queueName, true, false, false, null).getQueue();
String routingKey = "queue.1";
channel.queueBind(queue, exchangeName, routingKey);

// Send messages to queues with an exact match between the routing key and the queue's binding key.
channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes(StandardCharsets.UTF_8));

Using the RabbitMQ default exchange (a direct exchange)

// Create a connection and a channel
Connection connection = MyHelper.getConnection();
Channel channel = connection.createChannel();

// The default exchange (an empty string "")
String exchangeName = "";

// Declare the queue on every message sent. It will not do anything if the queue already exists.
String queueName = "queue.1";
String queue = channel.queueDeclare(queueName, true, false, false, null).getQueue();

// Send messages. The routing key is the queue name.
channel.basicPublish(exchangeName, queueName, null, "Hello, world!".getBytes(StandardCharsets.UTF_8));

Using a topic exchange to route messages to multiple queues

// Create a connection and a channel
Connection connection = MyHelper.getConnection();
Channel channel = connection.createChannel();

// Declare the exchange on every message sent.
String exchangeName = "my_topic_exchange";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true);

// Declare the queue on every message sent. It will not do anything if the queue already exists.
String routingKey = "queue.*";
String queueName = "queue.1";
String queue = channel.queueDeclare(queueName, true, false, false, null).getQueue();
channel.queueBind(queue, exchangeName, routingKey);
String queueName2 = "queue.2";
String queue2 = channel.queueDeclare(queueName2, true, false, false, null).getQueue();
channel.queueBind(queue2, exchangeName, routingKey);

// Send messages to queues based on pattern-matching routing keys.
// All routing keys (queue.1, quque.2, queue.*) match the binding key (queue.*).
channel.basicPublish(exchangeName, "queue.1", null, "Hello, world!".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "queue.2", null, "Hello, world!".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "queue.*", null, "Hello, world!".getBytes(StandardCharsets.UTF_8));

Using a fanout exchange to route messages to all queues bound to the fanout exchange

// Create a connection and a channel
Connection connection = MyHelper.getConnection();
Channel channel = connection.createChannel();

// Declare the exchange on every message sent.
String exchangeName = "my_fanout_exchange";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true);

// Declare the queue on every message sent. It will not do anything if the queue already exists.
String routingKey = "";
String queueName = "queue.1";
String queue = channel.queueDeclare(queueName, true, false, false, null).getQueue();
channel.queueBind(queue, exchangeName, routingKey);
String queueName2 = "queue.2";
String queue2 = channel.queueDeclare(queueName2, true, false, false, null).getQueue();
channel.queueBind(queue2, exchangeName, routingKey);

// Send messages to all queues bound to the fanout exchange.
channel.basicPublish(exchangeName, "", null, "Hello, world!".getBytes(StandardCharsets.UTF_8));

Consuming messages

Connection connection = MyHelper.getConnection();
Channel channel = connection.createChannel();
String queueName = "your_queue";
channel.basicConsume(queueName, false, "myConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// process the message before acknowledge
log.info("Received message: {}", new String(body));
channel.basicAck(deliveryTag, false);
}
});

Advanced features

Handling dead letters

Not all messages are consumed every day, which leads to messages piling up in queues. Though the amount of data is not detrimental to the system, having messages lying around in queues, potentially forever, is not a good idea. Imagine a user logging in after a couple of weeks of vacation and being flooded with obsolete messages—this is the negative type of user experience that should be avoided.

A Dead Letter refers to a message that cannot be delivered, processed, or acknowledged, and is therefore redirected to a Dead Letter Exchange (DLX) for special handling.

There are two common ways to handle dead letters:

  1. It will be emailed to the user if it’s an important information message.
  2. It will be discarded if it’s an information message for the current situation or non-important information.

The following is an example of moving expired messages to a Dead Letter Exchange:

// Get a connection and a channel
Connection connection = MyHelper.getConnection();
Channel channel = connection.createChannel();

// Declare a fanout exchange
channel.exchangeDeclare("my_fanout_exchange", BuiltinExchangeType.FANOUT, true);
// Declare a queue. And setting TTL and dead letter exchange
Map<String, Object> arguments = Map.of(
"x-message-ttl", 10000, // 10 seconds for testing
"x-dead-letter-exchange", "my_dead_letter_exchange"
);
channel.queueDeclare("my_queue", true, false, false, arguments);
// Bind the queue to the exchange
channel.queueBind("my_queue", "my_fanout_exchange", "");

// Declare the dead letter exchange and queue for handling dead letters
channel.exchangeDeclare("my_dead_letter_exchange", BuiltinExchangeType.FANOUT, true);
channel.queueDeclare("my_dead_letter_queue", true, false, false, null);
channel.queueBind("my_dead_letter_queue", "my_dead_letter_exchange", "");

// Publish a message. After 10 seconds, the message will be moved to the dead letter exchange and queue
channel.basicPublish("my_fanout_exchange", "", null, "Hello, world!".getBytes(StandardCharsets.UTF_8));

Delayed messages

Sometimes you want to delay sending messages. For example, a taxi user survey should be sent out to customers 5 minutes after a finished ride.

The AMQP protocol doesn’t have a native delayed queue feature, but one can easily be emulated by combining the message TTL function and the dead-lettering function.

The following is an example of sending delayed messages:

// Get a connection and a channel
Connection connection = MyHelper.getConnection();
Channel channel = connection.createChannel();

// Declare a queue with TTL and dead letter exchange
Map<String, Object> arguments = Map.of(
"x-message-ttl", 300000, // 5 minutes
"x-dead-letter-exchange", "", // use the Rabbit default direct exchange (an empty string)
"x-dead-letter-routing-key", "work.now"
);
channel.queueDeclare("work.later", true, false, false, arguments);

// Declare the queue that will receive the messages after the delay
channel.queueDeclare("work.now", true, false, false, null);

// Publish a message to the delay queue
channel.basicPublish("", "work.later", null, "Hello, world!".getBytes(StandardCharsets.UTF_8));

Mandatory

If no queue is bound to the exchange with a matching routing key, the message is silently dropped.

The mandatory flag is an option you can set when publishing a message, and it controls what happens if the message cannot be routed to any queue. The mandatory flag tells RabbitMQ: If you can’t route this message to at least one queue, return it to the sender instead of silently dropping it.

// Get a connection and a channel
Connection connection = MyHelper.getConnection();
Channel channel = connection.createChannel();

// Add a return listener to the channel to handle messages that cannot be routed to any queue
channel.addReturnListener(new MyReturnListener());

// Publish a message with mandatory flag set to true
channel.basicPublish("", "a-non-existing-queue", true, null, "Hello, world!".getBytes(StandardCharsets.UTF_8));

MyReturnListener

@Slf4j
public class MyReturnListener implements ReturnListener {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("Returned message:");
log.info(" Reply Code: " + replyCode);
log.info(" Reply Text: " + replyText);
log.info(" Exchange: " + exchange);
log.info(" Routing Key: " + routingKey);
log.info(" Properties: " + properties);
log.info(" Body: " + new String(body));
}
}

Settings

Specifying a consumer prefetch count

The number of messages sent to the consumer at the same time can be specified through the prefetch count value. The prefetch count value is used to get as much out of the consumers as possible. The default value of prefetch count in RabbitMQ is unlimited. By default, RabbitMQ will send as many messages as the consumer can buffer, without waiting for acknowledgments.

If the prefetch count is too small, it could negatively affect the performance of RabbitMQ, since the platform is usually waiting for permission to send more messages.

If the prefetch count is large, it makes RabbitMQ send many messages from one queue to one consumer. If all messages are sent to a single consumer, it may be overwhelmed and leave the other consumers idle.

Setting the correct prefetch value:

  • Setting a high prefetch value: In a scenario of one or a few consumers who are quickly processing messages, it is recommended to prefetch many messages at once to keep the client as busy as possible.
  • Setting a low prefetch value: A low prefetch value is recommended in situations where there are many consumers and a short processing time.
  • Setting prefetch value to one: In scenarios where there are many consumers and/or a longer time to process messages, it is recommended that the prefetch value is set to one (1) to evenly distribute messages among all consumers.

Set the prefetch value for a message consumer channel:

channel.basicQos(prefetchCount);

References

[1] RabbitMQ Essentials (2nd) by Lovisa Johansson and David Dossot

[2] Java Client API Guide - RabbitMQ

[3] RabbitMQ Tutorials