In Pika or RabbitMQ, How do I check if any consumers are currently consuming?

11,442

I was just looking into this as well. After reading through the source and docs I came across the following in channel.py:

@property
def consumer_tags(self):
    """Property method that returns a list of currently active consumers

    :rtype: list

    """
    return self._consumers.keys()

My own testing was successful. I used the following where my channel object is self._channel:

if len(self._channel.consumer_tags) == 0:
        LOGGER.info("Nobody is listening.  I'll come back in a couple of minutes.")
        ...
Share:
11,442
Optimus
Author by

Optimus

a noob programmer

Updated on June 05, 2022

Comments

  • Optimus
    Optimus almost 2 years

    I would like to check if a Consumer/Worker is present to consume a Message I am about to send.

    If there isn't any Worker, I would start some workers (both consumers and publishers are on a single machine) and then go about publishing Messages.

    If there is a function like connection.check_if_has_consumers, I would implement it somewhat like this -

    import pika
    import workers
    
    # code for publishing to worker queue
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    # if there are no consumers running (would be nice to have such a function)
    if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
        # start the workers in other processes, using python's `multiprocessing`
        workers.start_workers()
    
    # now, publish with no fear of your queues getting filled up
    channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
    channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
                                properties=pika.BasicProperties(delivery_mode=2))
    connection.close()
    

    But I am unable to find any function with check_if_has_consumers functionality in pika.

    Is there some way of accomplishing this, using pika? or maybe, by talking to The Rabbit directly?

    I am not completely sure, but I really think RabbitMQ would be aware of the number of consumers subscribed to different queues, since it does dispatch messages to them and accepts acks

    I just got started with RabbitMQ 3 hours ago... any help is welcome...

    here is the workers.py code I wrote, if its any help....

    import multiprocessing
    import pika
    
    
    def start_workers(num=3):
        """start workers as non-daemon processes"""
        for i in xrange(num):    
            process = WorkerProcess()
            process.start()
    
    
    class WorkerProcess(multiprocessing.Process):
        """
        worker process that waits infinitly for task msgs and calls
        the `callback` whenever it gets a msg
        """
        def __init__(self):
            multiprocessing.Process.__init__(self)
            self.stop_working = multiprocessing.Event()
    
        def run(self):
            """
            worker method, open a channel through a pika connection and
            start consuming
            """
            connection = pika.BlockingConnection(
                                  pika.ConnectionParameters(host='localhost')
                         )
            channel = connection.channel()
            channel.queue_declare(queue='worker_queue', auto_delete=False,
                                                        durable=True)
    
            # don't give work to one worker guy until he's finished
            channel.basic_qos(prefetch_count=1)
            channel.basic_consume(callback, queue='worker_queue')
    
            # do what `channel.start_consuming()` does but with stopping signal
            while len(channel._consumers) and not self.stop_working.is_set():
                channel.transport.connection.process_data_events()
    
            channel.stop_consuming()
            connection.close()
            return 0
    
        def signal_exit(self):
            """exit when finished with current loop"""
            self.stop_working.set()
    
        def exit(self):
            """exit worker, blocks until worker is finished and dead"""
            self.signal_exit()
            while self.is_alive(): # checking `is_alive()` on zombies kills them
                time.sleep(1)
    
        def kill(self):
            """kill now! should not use this, might create problems"""
            self.terminate()
            self.join()
    
    
    def callback(channel, method, properties, body):
        """pika basic consume callback"""
        print 'GOT:', body
        # do some heavy lifting here
        result = save_to_database(body)
        print 'DONE:', result
        channel.basic_ack(delivery_tag=method.delivery_tag)
    

    EDIT:

    I have to move forward so here is a workaround that I am going to take, unless a better approach comes along,

    So, RabbitMQ has these HTTP management apis, they work after you have turned on the management plugin and at middle of HTTP apis page there is

    /api/connections - A list of all open connections.

    /api/connections/name - An individual connection. DELETEing it will close the connection.

    So, if I connect my Workers and my Produces both by different Connection names / users, I'll be able to check if the Worker Connection is open... (there might be issues when worker dies...)

    will be waiting for a better solution...

    EDIT:

    just found this in the rabbitmq docs, but this would be hacky to do in python:

    shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
    Listing queues ...
    worker_queue    0
    ...done.
    

    so i could do something like,

    subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")
    

    hacky... still hope pika has some python function to do this...

    Thanks,