Pika + RabbitMQ: setting basic_qos to prefetch=1 still appears to consume all messages in the queue
11,296
I appear to have solved this by moving where basic_qos
is called.
Placing it just after channel = connection.channel()
appears to alter the behaviour to what I'd expect.
Related videos on Youtube
Author by
growse
Updated on June 04, 2022Comments
-
growse almost 2 years
I've got a python worker client that spins up a 10 workers which each hook onto a RabbitMQ queue. A bit like this:
#!/usr/bin/python worker_count=10 def mqworker(queue, configurer): connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost')) channel = connection.channel() channel.queue_declare(queue=qname, durable=True) channel.basic_consume(callback,queue=qname,no_ack=False) channel.basic_qos(prefetch_count=1) channel.start_consuming() def callback(ch, method, properties, body): doSomeWork(); ch.basic_ack(delivery_tag = method.delivery_tag) if __name__ == '__main__': for i in range(worker_count): worker = multiprocessing.Process(target=mqworker) worker.start()
The issue I have is that despite setting basic_qos on the channel, the first worker to start accepts all the messages off the queue, whilst the others sit there idle. I can see this in the rabbitmq interface, that even when I set
worker_count
to be 1 and dump 50 messages on the queue, all 50 go into the 'unacknowledged' bucket, whereas I'd expect 1 to become unacknowledged and the other 49 to be ready.Why isn't this working?
-
Sajuuk about 7 yearsthank you! that did solve the issue. and btw this is very hard to debug..
-
Jordan about 7 years@Hiagara yeah just ran into this today myself. Amazing that almost 5 years later this is still not clear or documented in the API.
-
rborodinov almost 6 yearsI think that we should to declarate
basic_qos
beforebasic_consume
. Because basic_consume use this setting when initialized. -
Highstaker almost 6 yearsagreed with @rborodinov. I had
basic_qos
right afterbasic_consume
and it didn't work. Switched them, now it works fine. -
Tobias over 4 yearsI also had to set
auto_ack=False
when setting up thebasic_consume
for it to work. Otherwise it still consumed more messages than expected. -
jkulak about 2 yearsMy
.ack()
was in the loop inside the callback, so it was trying to call it more than once for everydelivery_tag
thus resulting in a RabbitMQ 406 PRECONDITION_FAILED - unknown delivery tag.