In Pika or RabbitMQ, How do I check if any consumers are currently consuming?
I was just looking into this as well. After reading through the source and docs I came across the following in channel.py:
@property
def consumer_tags(self):
"""Property method that returns a list of currently active consumers
:rtype: list
"""
return self._consumers.keys()
My own testing was successful. I used the following where my channel object is self._channel:
if len(self._channel.consumer_tags) == 0:
LOGGER.info("Nobody is listening. I'll come back in a couple of minutes.")
...
Comments
-
Optimus almost 2 years
I would like to check if a Consumer/Worker is present to consume a Message I am about to send.
If there isn't any Worker, I would start some workers (both consumers and publishers are on a single machine) and then go about publishing Messages.
If there is a function like
connection.check_if_has_consumers
, I would implement it somewhat like this -import pika import workers # code for publishing to worker queue connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # if there are no consumers running (would be nice to have such a function) if not connection.check_if_has_consumers(queue="worker_queue", exchange=""): # start the workers in other processes, using python's `multiprocessing` workers.start_workers() # now, publish with no fear of your queues getting filled up channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True) channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin", properties=pika.BasicProperties(delivery_mode=2)) connection.close()
But I am unable to find any function with
check_if_has_consumers
functionality in pika.Is there some way of accomplishing this, using pika? or maybe, by talking to The Rabbit directly?
I am not completely sure, but I really think RabbitMQ would be aware of the number of consumers subscribed to different queues, since it does dispatch messages to them and accepts acks
I just got started with RabbitMQ 3 hours ago... any help is welcome...
here is the workers.py code I wrote, if its any help....
import multiprocessing import pika def start_workers(num=3): """start workers as non-daemon processes""" for i in xrange(num): process = WorkerProcess() process.start() class WorkerProcess(multiprocessing.Process): """ worker process that waits infinitly for task msgs and calls the `callback` whenever it gets a msg """ def __init__(self): multiprocessing.Process.__init__(self) self.stop_working = multiprocessing.Event() def run(self): """ worker method, open a channel through a pika connection and start consuming """ connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() channel.queue_declare(queue='worker_queue', auto_delete=False, durable=True) # don't give work to one worker guy until he's finished channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='worker_queue') # do what `channel.start_consuming()` does but with stopping signal while len(channel._consumers) and not self.stop_working.is_set(): channel.transport.connection.process_data_events() channel.stop_consuming() connection.close() return 0 def signal_exit(self): """exit when finished with current loop""" self.stop_working.set() def exit(self): """exit worker, blocks until worker is finished and dead""" self.signal_exit() while self.is_alive(): # checking `is_alive()` on zombies kills them time.sleep(1) def kill(self): """kill now! should not use this, might create problems""" self.terminate() self.join() def callback(channel, method, properties, body): """pika basic consume callback""" print 'GOT:', body # do some heavy lifting here result = save_to_database(body) print 'DONE:', result channel.basic_ack(delivery_tag=method.delivery_tag)
EDIT:
I have to move forward so here is a workaround that I am going to take, unless a better approach comes along,
So, RabbitMQ has these HTTP management apis, they work after you have turned on the management plugin and at middle of HTTP apis page there is
/api/connections - A list of all open connections.
/api/connections/name - An individual connection. DELETEing it will close the connection.
So, if I connect my Workers and my Produces both by different Connection names / users, I'll be able to check if the Worker Connection is open... (there might be issues when worker dies...)
will be waiting for a better solution...
EDIT:
just found this in the rabbitmq docs, but this would be hacky to do in python:
shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers Listing queues ... worker_queue 0 ...done.
so i could do something like,
subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")
hacky... still hope pika has some python function to do this...
Thanks,