Processing SQS queues with boto

14,173

Solution 1

You shouldn't rely on the count for a queue as it's only meant to provide an approximate count and is not guaranteed to be accurate.

If you want to just keep polling forever, just do this:

while 1:
    messages = q.get_messages()
    # do something with messages
    time.sleep(N)

I have added the call to time.sleep to introduce a delay in the loop. The value of N should be at least one second and could be considerably more, depending on how quickly you expect new messages to appear in your queue. If you don't put some sort of delay in the loop you will probably start getting throttled by the service.

To avoid a message getting read multiple times, you should try to adjust the visibility timeout of the queue to be a value greater than the time it takes you to process a message and then make sure you delete the message when processing has completed.

Solution 2

Example:

# wait_time_seconds count only 1 request in x seconds (0 - 20)
# num_messages get x messages in same request (1 - 10)
while 1:
    logger.info("... waiting messages ...")
    messages = queue_in.get_messages(wait_time_seconds=20, num_messages=10)
    for message in messages:
        logger.info('message: %s' % (message,))
        queue_in.delete_message(message)

Solution 3

  1. When you pull a message from SQS, the message becomes invisible and unreachable by other queue queries (edit - invisibility can be set between 0 and 12 hrs).
  2. You will have to get the queue again each time new messages are added, but this should not be a problem - that's why the queuing service exists in the first place.

If you want to constantly poll the queue, try what's called Long Polling - you can have a continuous poll for up to 20 seconds that returns when the queue is populated.

Hope that's helpful, otherwise poke around the boto sqs documentation.

Share:
14,173

Related videos on Youtube

waigani
Author by

waigani

I use SO to ask all my stupid questions. A public record of everything I didn't know, but now do :)

Updated on June 04, 2022

Comments

  • waigani
    waigani almost 2 years

    I have a python script using the boto library on ec2 instance which is part of an autoscaling group. The script processes messages from a SQS queue:

    import boto
    from boto.sqs.message import Message
    
    conn = boto.connect_sqs()
    q = conn.create_queue('queue-name')
    
    while (qin.count() > 0):
        m = q.get_messages()
        #do something with the message
    

    Does using the while statement make sense? Does count() update in real time as:

    1. other instances take messages off the queue (or am I going to double up)
    2. new messages are added to the queue (or will I miss them?)

    How do I make this script constantly listen for new additions to the queue, even while the queue is empty?

    In this question Processing items in SQS queue with a php script it was mentioned that 'sqs ruby client library has a method "poll" which continuously polls the queue and on receiving a message in the queue passes it on to a block'. Is there an equivalent in Python?

    It has also been suggested that the SNS could be used to notify the scripts of the message queue status but I do not see how you could configure a responsive system with SNS as the metric alarms are not fine grained enough.

  • Jan Vlcinsky
    Jan Vlcinsky over 10 years
    Message invisibility is by default 30 seconds, not 4 days. And this period can be modified.