How can I recover unacknowledged AMQP messages from other channels than my connection's own?

63,432

Solution 1

Unacknowledged messages are those which have been delivered across the network to a consumer but have not yet been ack'ed or rejected -- but that consumer hasn't yet closed the channel or connection over which it originally received them. Therefore the broker can't figure out if the consumer is just taking a long time to process those messages or if it has forgotten about them. So, it leaves them in an unacknowledged state until either the consumer dies or they get ack'ed or rejected.

Since those messages could still be validly processed in the future by the still-alive consumer that originally consumed them, you can't (to my knowledge) insert another consumer into the mix and try to make external decisions about them. You need to fix your consumers to make decisions about each message as they get processed rather than leaving old messages unacknowledged.

Solution 2

If messages are unacked there are only two ways to get them back into the queue:

  1. basic.nack

    This command will cause the message to be placed back into the queue and redelivered.

  2. Disconnect from the broker

    This action will force all unacked messages from this channel to be put back into the queue.

NOTE: basic.recover will try to republish unacked messages on the same channel (to the same consumer), which is sometimes the desired behaviour.

RabbitMQ spec for basic.recover and basic.nack


The real question is: Why are the messages unacknowledged?

Possible scenarios to cause unacked messages:

  1. Consumer fetching too many messages, then not processing and acking them quickly enough.

    Solution: Prefetch as few messages as appropriate.

  2. Buggy client library (I have this issue currently with pika 0.9.13. If the queue has a lot of messages, a certain number of messages will get stuck unacked, even hours later.

    Solution: I have to restart the consumer several times until all unacked messages are gone from the queue.

Solution 3

All the unacknowledged messages will go to ready state once all the workers/consumers are stopped.

Ensure all workers are stopped by confirming with a grep on ps aux output, and stopping/killing them if found.

If you are managing workers using supervisor, which shows as worker is stopped, you may want to check for zombies. Supervisor reports the worker to be stopped but still you will find zombie processes running when grepped on ps aux output. Killing the zombie processes will bring messages back to ready state.

Share:
63,432
Will Olbrys
Author by

Will Olbrys

I'm an open source web developer in NYC. One of only two or three I'm sure...

Updated on July 31, 2020

Comments

  • Will Olbrys
    Will Olbrys almost 4 years

    It seems the longer I keep my rabbitmq server running, the more trouble I have with unacknowledged messages. I would love to requeue them. In fact there seems to be an amqp command to do this, but it only applies to the channel that your connection is using. I built a little pika script to at least try it out, but I am either missing something or it cannot be done this way (how about with rabbitmqctl?)

    import pika
    
    credentials = pika.PlainCredentials('***', '***')
    parameters = pika.ConnectionParameters(host='localhost',port=5672,\
        credentials=credentials, virtual_host='***')
    
    def handle_delivery(body):
        """Called when we receive a message from RabbitMQ"""
        print body
    
    def on_connected(connection):
        """Called when we are fully connected to RabbitMQ"""
        connection.channel(on_channel_open)    
    
    def on_channel_open(new_channel):
        """Called when our channel has opened"""
        global channel
        channel = new_channel
        channel.basic_recover(callback=handle_delivery,requeue=True)    
    
    try:
        connection = pika.SelectConnection(parameters=parameters,\
            on_open_callback=on_connected)    
    
        # Loop so we can communicate with RabbitMQ
        connection.ioloop.start()
    except KeyboardInterrupt:
        # Gracefully close the connection
        connection.close()
        # Loop until we're fully closed, will stop on its own
        connection.ioloop.start()
    
  • Will Olbrys
    Will Olbrys almost 13 years
    so the basic.recover must be called by the consumer? im using celeryd to manage connections. it might be possible to send that recover command to poorly responsive queues with celeryctl (if youre familiar with that...)
  • Michael Dillon
    Michael Dillon almost 13 years
    @will my condolences that you are using Celery. The celery developers simply do not understand AMQP and have created a badly broken implementation. You need to make a choice, either get rid of celery and do AMQP right, or stop using AMQP with celery and use something simple like Redis instead. I chose to drop celery and stay with AMQP.
  • Will Olbrys
    Will Olbrys almost 13 years
    thats quite an indictment. if you don't mind me asking, what about celery's AMQP implementation is not executed properly?
  • Jeremy Dunck
    Jeremy Dunck over 12 years
    I agree, I'd like to hear why you think Celery is so badly broken. It's very widely used and this is the first time I've heard this complaint.
  • istepaniuk
    istepaniuk over 9 years
    Has your issue with pika been reported? Can you provide a link?
  • IvanD
    IvanD over 9 years
    It's a python recursion limit that kicked in. Something about not being able to recurse >1000 times, which is what was apparently happening with pika 0.9.13. Not seeing it with 0.9.14.
  • IvanD
    IvanD over 9 years
    Finally found where the issue was reported: github.com/pika/pika/issues/286
  • Shlomi Uziel
    Shlomi Uziel about 7 years
    You can also determine if a rabbit connection is being held up by a zombie process, by using the RabbitMQ managment console, as I described here: stackoverflow.com/questions/11926077/…
  • Rakesh Sharma
    Rakesh Sharma over 2 years
    rabbitmq.com/consumers.html#acknowledgement-timeout this setting is there in new version to avoid consumer timeout.