Pika + RabbitMQ: setting basic_qos to prefetch=1 still appears to consume all messages in the queue

11,296

I appear to have solved this by moving where basic_qos is called.

Placing it just after channel = connection.channel() appears to alter the behaviour to what I'd expect.

Share:
11,296

Related videos on Youtube

growse
Author by

growse

Updated on June 04, 2022

Comments

  • growse
    growse almost 2 years

    I've got a python worker client that spins up a 10 workers which each hook onto a RabbitMQ queue. A bit like this:

    #!/usr/bin/python
    worker_count=10
    
    def mqworker(queue, configurer):
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
        channel = connection.channel()
        channel.queue_declare(queue=qname, durable=True)
        channel.basic_consume(callback,queue=qname,no_ack=False)
        channel.basic_qos(prefetch_count=1)
        channel.start_consuming()
    
    
    def callback(ch, method, properties, body):
        doSomeWork();
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    if __name__ == '__main__':
        for i in range(worker_count):
            worker = multiprocessing.Process(target=mqworker)
            worker.start()
    

    The issue I have is that despite setting basic_qos on the channel, the first worker to start accepts all the messages off the queue, whilst the others sit there idle. I can see this in the rabbitmq interface, that even when I set worker_count to be 1 and dump 50 messages on the queue, all 50 go into the 'unacknowledged' bucket, whereas I'd expect 1 to become unacknowledged and the other 49 to be ready.

    Why isn't this working?

  • Sajuuk
    Sajuuk about 7 years
    thank you! that did solve the issue. and btw this is very hard to debug..
  • Jordan
    Jordan about 7 years
    @Hiagara yeah just ran into this today myself. Amazing that almost 5 years later this is still not clear or documented in the API.
  • rborodinov
    rborodinov almost 6 years
    I think that we should to declarate basic_qos before basic_consume. Because basic_consume use this setting when initialized.
  • Highstaker
    Highstaker almost 6 years
    agreed with @rborodinov. I had basic_qos right after basic_consume and it didn't work. Switched them, now it works fine.
  • Tobias
    Tobias over 4 years
    I also had to set auto_ack=False when setting up the basic_consume for it to work. Otherwise it still consumed more messages than expected.
  • jkulak
    jkulak about 2 years
    My .ack() was in the loop inside the callback, so it was trying to call it more than once for every delivery_tag thus resulting in a RabbitMQ 406 PRECONDITION_FAILED - unknown delivery tag.