RabbitMQ pika.exceptions.ConnectionClosed

12,983

Solution 1

This is because you are keeping the main thread waiting, and because of this pika cannot handle incoming messages; in this case it cannot respond to the heartbeat until the subprocess is done. This causes RabbitMQ to think that the client is dead and forces a disconnection.

If you want this to work with heartbeats (which is recommend) you need to periodically call connection.process_data_events. This can be done by adding a loop that checks if the thread is done, and every 30s or so call process_data_events until the thread is done.

Solution 2

Look add this https://github.com/mosquito/aio-pika

It's an asynchio wrapper and if u understand the concept behind asynchron very easy to use :)

Solution 3

Here is pika document about how to avoid the connection being dropped because of heartbeat.

https://pika.readthedocs.io/en/stable/examples/heartbeat_and_blocked_timeouts.html

In a version of pika older than 0.11.2, although we can add an argument inside the pika.ConnectionParameters: heartbeat_interval=600, but it cannot help if the server side has a short heartbeat value of 60s. It can work only when the version is at least 0.11.2

Share:
12,983
Admin
Author by

Admin

Updated on July 24, 2022

Comments

  • Admin
    Admin almost 2 years

    I tried to send message and receive message using RabbitMQ. I dont have computer science background, the terms I used could not be very accurate.

    I try to copy the tutorial file: When submitting my html form, my python script (cgi) the message is submitting to the queue

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            channel = connection.channel()
            channel.queue_declare(queue='task_queue', durable=True)
            message = PN
            channel.basic_publish(exchange='',
                                  routing_key='task_queue',
                                  body=message,
                                  properties=pika.BasicProperties(
                                     delivery_mode = 2, # make message persistent
                                  ))
            connection.close()
    

    my receiver is running :

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue', durable=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] Received Project %r" % body)
        #ch.basic_ack(delivery_tag = method.delivery_tag) 
        if not (os.path.isfile(js_path)):
            print (' [*] ERROR files missing ')
            #ch.basic_ack(delivery_tag = method.delivery_tag)
            return
        p= subprocess.Popen(run a subprocess here)
        p.wait()
    
        print (' [*] Temporary Files removed')
        print(" [*] Waiting for messages. To exit press CTRL+C")
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,queue='task_queue',no_ack=True)
    channel.start_consuming()
    

    It manages most of the time but randomly crash with the following error:

    Traceback (most recent call last): File "Receive5.py", line 139, in channel.start_consuming() File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 1681, in start_consuming self.connection.process_data_events(time_limit=None) File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 647, in process_data_events self._flush_output(common_terminator) File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 426, in _flush_output raise exceptions.ConnectionClosed() pika.exceptions.ConnectionClosed

  • Admin
    Admin almost 8 years
    just curious, is the RabbitMQ tutorial missing this part or a bad use of the tutorial from my side? (rabbitmq.com/tutorials/tutorial-two-python.html)
  • eandersson
    eandersson almost 8 years
    I would say that this is largely undocumented. I went as far as creating my own AMQP library to avoid this problem.
  • Tomaski
    Tomaski about 5 years
    downwoting because the answer is completely besides the point