RabbitMQ non-blocking consumer

13,093

Form FAQ:

Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads,

So lets create the connection inside the thread:

import pika


class PikaMassenger():

    exchange_name = '...'

    def __init__(self, *args, **kwargs):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.conn.channel()
        self.channel.exchange_declare(
            exchange=self.exchange_name, 
            exchange_type='topic')

    def consume(self, keys, callback):
        result = self.channel.queue_declare('', exclusive=True)
        queue_name = result.method.queue
        for key in keys:
            self.channel.queue_bind(
                exchange=self.exchange_name, 
                queue=queue_name, 
                routing_key=key)

        self.channel.basic_consume(
            queue=queue_name, 
            on_message_callback=callback, 
            auto_ack=True)

        self.channel.start_consuming()


    def __enter__(self):
        return self


    def __exit__(self, exc_type, exc_value, traceback):
        self.conn.close()

def start_consumer():

    def callback(ch, method, properties, body):
        print(" [x] %r:%r consumed" % (method.routing_key, body))

    with PikaMassenger() as consumer:
        consumer.consume(keys=[...], callback=callback)


consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.start()
Share:
13,093

Related videos on Youtube

Hugo Sousa
Author by

Hugo Sousa

Senior Software Engineer @ ING

Updated on September 14, 2022

Comments

  • Hugo Sousa
    Hugo Sousa over 1 year

    I'm using RabbitMQ in Python to manage several queues between a producer and multiple consumers. In the example in RabbitMQ website (routing model), the consumers are blocked. It means that they stop on start_consuming() and execute the callback function every time there is a new "task" in the queue.

    My question is: how can I implement my consumer in a way that he is still waiting for tasks (so, the callback function is called every time there is new things in the queue) but at the same time he can execute other work/code.

    Thank you

    • loopbackbee
      loopbackbee about 10 years
      why not run start_consuming() on a separate thread?
    • Hugo Sousa
      Hugo Sousa about 10 years
      Well, I got a solution pretty easy. Instead of using basic_consume, I can simple use the basic_get inside a function and call this function every X seconds. But there is a question: would the queue tasks be delivered with some order?
    • Andriy Drozdyuk
      Andriy Drozdyuk about 9 years
      @HugoSousa Would be great if you could post a full solution here - I am new to rabbitmq and it would really help others.
    • kwarunek
      kwarunek almost 9 years
      another option is to use pika with Tornado fully async (pika.readthedocs.org/en/latest/examples/tornado_consumer.ht‌​ml)
    • Incömplete
      Incömplete over 5 years
      @goncalopp according to FAQ, pika is not threadsafe
    • adnanmuttaleb
      adnanmuttaleb almost 5 years
      why this question is not answered yet!!
  • Robben_Ford_Fan_boy
    Robben_Ford_Fan_boy about 7 years
    Pika is not thread safe - the API does not support this functionality
  • m.raynal
    m.raynal about 6 years
    Robben_Ford is right, one should not use threads along with pika. "Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads." -> From pika doc FAQ