How to create a delayed queue in RabbitMQ?

34,168

Solution 1

I found this extremely useful when developing my applications. As it gives you an alternative to simply re-queuing your messages. This can easily reduce the complexity of your code, and is one of many powerful hidden features in RabbitMQ.

Steps

First we need to set up two basic channels, one for the main queue, and one for the delay queue. In my example at the end, I include a couple of additional flags that are not required, but makes the code more reliable; such as confirm delivery, delivery_mode and durable. You can find more information on these in the RabbitMQ manual.

After we have set up the channels we add a binding to the main channel that we can use to send messages from the delay channel to our main queue.

channel.queue_bind(exchange='amq.direct',
                   queue='hello')

Next we need to configure our delay channel to forward messages to the main queue once they have expired.

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000,
  'x-dead-letter-exchange' : 'amq.direct',
  'x-dead-letter-routing-key' : 'hello'
})
  • x-message-ttl (Message - Time To Live)

    This is normally used to automatically remove old messages in the queue after a specific duration, but by adding two optional arguments we can change this behaviour, and instead have this parameter determine in milliseconds how long messages will stay in the delay queue.

  • x-dead-letter-routing-key

    This variable allows us to transfer the message to a different queue once they have expired, instead of the default behaviour of removing it completely.

  • x-dead-letter-exchange

    This variable determines which Exchange used to transfer the message from hello_delay to hello queue.

Publishing to the delay queue

When we are done setting up all the basic Pika parameters you simply send a message to the delay queue using basic publish.

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

Once you have executed the script you should see the following queues created in your RabbitMQ management module. enter image description here

Example.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer 
# messages from our delay queue.
channel.queue_bind(exchange='amq.direct',
                   queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_delivery()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds.
  'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B.
  'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

print " [x] Sent"

Solution 2

You can use RabbitMQ official plugin: x-delayed-message .

Firstly, download and copy the ez file into Your_rabbitmq_root_path/plugins

Secondly, enable the plugin (do not need to restart the server):

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Finally, publish your message with "x-delay" headers like:

headers.put("x-delay", 5000);

Notice:

It does not ensure your message's safety, cause if your message expires just during your rabbitmq-server's downtime, unfortunately the message is lost. So be careful when you use this scheme.

Enjoy it and more info in rabbitmq-delayed-message-exchange

Solution 3

FYI, how to do this in Spring 3.2.x.

<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/>

<rabbit:queue-arguments id="delayQueueArguments">
  <entry key="x-message-ttl">
    <value type="java.lang.Long">10000</value>
  </entry>
  <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
  <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
</rabbit:queue-arguments>


<rabbit:fanout-exchange name="finalDestinationTopic">
  <rabbit:bindings>
    <rabbit:binding queue="finalDestinationQueue"/>
  </rabbit:bindings>
</rabbit:fanout-exchange>

Solution 4

NodeJS implementation.

Everything is pretty clear from the code. Hope it will save somebody's time.

var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});

// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
      deadLetterExchange: "my_final_delayed_exchange",
      messageTtl: 5000, // 5sec
}, function (err, q) {
      ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
      ch.bindQueue(q.queue, "my_final_delayed_exchange", '');

      ch.consume(q.queue, function (msg) {
          console.log("delayed - [x] %s", msg.content.toString());
      }, {noAck: true});
});

Solution 5

Message in Rabbit queue can be delayed in 2 ways - using QUEUE TTL - using Message TTL If all messages in queue are to be delayed for fixed time use queue TTL. If each message has to be delayed by varied time use Message TTL. I have explained it using python3 and pika module. pika BasicProperties argument 'expiration' in milliseconds has to be set to delay message in delay queue. After setting expiration time, publish message to a delayed_queue ("not actual queue where consumers are waiting to consume") , once message in delayed_queue expires, message will be routed to a actual queue using exchange 'amq.direct'

def delay_publish(self, messages, queue, headers=None, expiration=0):
    """
    Connect to RabbitMQ and publish messages to the queue
    Args:
        queue (string): queue name
        messages (list or single item): messages to publish to rabbit queue
        expiration(int): TTL in milliseconds for message
    """
    delay_queue = "".join([queue, "_delay"])
    logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue))
    logging.info('Connecting to RabbitMQ: {host}'.format(
        host=self.rabbit_host))
    credentials = pika.PlainCredentials(
       RABBIT_MQ_USER, RABBIT_MQ_PASS)
    parameters = pika.ConnectionParameters(
       rabbit_host, RABBIT_MQ_PORT,
        RABBIT_MQ_VHOST, credentials, heartbeat_interval=0)
    connection = pika.BlockingConnection(parameters)

    channel = connection.channel()
    channel.queue_declare(queue=queue, durable=True)

    channel.queue_bind(exchange='amq.direct',
                       queue=queue)
    delay_channel = connection.channel()
    delay_channel.queue_declare(queue=delay_queue, durable=True,
                                arguments={
                                    'x-dead-letter-exchange': 'amq.direct',
                                    'x-dead-letter-routing-key': queue
                                })

    properties = pika.BasicProperties(
        delivery_mode=2, headers=headers, expiration=str(expiration))

    if type(messages) not in (list, tuple):
        messages = [messages]

    try:
        for message in messages:
            try:
                json_data = json.dumps(message)
            except Exception as err:
                logging.error(
                    'Error Jsonify Payload: {err}, {payload}'.format(
                        err=err, payload=repr(message)), exc_info=True
                )
                if (type(message) is dict) and ('data' in message):
                    message['data'] = {}
                    message['error'] = 'Payload Invalid For JSON'
                    json_data = json.dumps(message)
                else:
                    raise

            try:
                delay_channel.basic_publish(
                    exchange='', routing_key=delay_queue,
                    body=json_data, properties=properties)
            except Exception as err:
                logging.error(
                    'Error Publishing Data: {err}, {payload}'.format(
                        err=err, payload=json_data), exc_info=True
                )
                raise

    except Exception:
        raise

    finally:
        logging.info(
            'Done Publishing. Closing Connection to {queue}'.format(
                queue=delay_queue
            )
        )
        connection.close()
Share:
34,168
eandersson
Author by

eandersson

Updated on August 15, 2020

Comments

  • eandersson
    eandersson almost 4 years

    What is the easiest way to create a delay (or parking) queue with Python, Pika and RabbitMQ? I have seen an similar questions, but none for Python.

    I find this an useful idea when designing applications, as it allows us to throttle messages that needs to be re-queued again.

    There are always the possibility that you will receive more messages than you can handle, maybe the HTTP server is slow, or the database is under too much stress.

    I also found it very useful when something went wrong in scenarios where there is a zero tolerance to losing messages, and while re-queuing messages that could not be handled may solve that. It can also cause problems where the message will be queued over and over again. Potentially causing performance issues, and log spam.

  • Marconi
    Marconi about 10 years
    what happens when each message to be published has varying ttl? how do I do that?
  • eandersson
    eandersson about 10 years
    There shouldn't be much difference. Simply move the TTL to the Message properties instead. Feel free to open a new question and link it here and I'll answer it.
  • Marconi
    Marconi about 10 years
    Thanks, I think you've answered it already but after some reading I found that its not reliable since a dead message can be stuck behind healthy ones and so even though they expire they still cant move on.
  • eandersson
    eandersson about 10 years
    You could probably combine it with a high queue TTL, and then set the per message TTL to a lower value, but like you mentioned it might not work as intended. I have never tried this with a per message TTL.
  • pancakes
    pancakes almost 10 years
    I was looking a similar solution for hours. Thanks for that! Works perfect. "X-" arguments should be better documented.
  • Manuel Zubieta
    Manuel Zubieta over 9 years
    @Marconi , could you please link to the articles that you read to come to that conclusion or further explain the downside?
  • Marconi
    Marconi over 9 years
    @ManuelZubieta sorry its been some time now so I don't remember exactly where I read it.
  • Aydin Gerek
    Aydin Gerek about 9 years
    @ManuelZubieta The caveats sub-section of the Per-Message TTL section in the RabbitMQ TTL docs linked to above (rabbitmq.com/ttl.html) explains how expired messages are only expired from the head of the queue. That appears to kill this answer as a viable solution for per message TTL.
  • Admin
    Admin over 6 years
    @Caolite: is there an issue in creating a queue for each set of delayed messages?
  • pymat
    pymat over 4 years
    @eandersson I like this solution, but what if the topology is so that there are dozens of main queues, and dozens of delay queues?