Pattern to continuously listen to AWS SQS messages

29,924

Solution 1

To my knowledge, there is no way in Amazon SQS to support an active listener model where Amazon SQS would "push" messages to your listener, or would invoke your message listener when there are messages.

So, you would always have to poll for messages. There are two polling mechanisms supported for polling - Short Polling and Long Polling. Each has its own pros and cons, but Long Polling is the one you would typically end up using in most cases, although the default one is Short Polling. Long Polling mechanism is definitely more efficient in terms of network traffic, is more cost efficient (because Amazon charges you by the number of requests made), and is also the preferred mechanism when you want your messages to be processed in a time sensitive manner (~= process as soon as possible).

There are more intricacies around Long Polling and Short Polling that are worth knowing, and its somewhat difficult to paraphrase all of that here, but if you like, you can read a lot more details about this through the following blog. It has a few code examples as well that should be helpful.

http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/

In terms of a while(true) loop, I would say it depends. If you are using Long Polling, and you can set the wait time to be (max) 20 seconds, that way you do not poll SQS more often than 20 seconds if there are no messages. If there are messages, you can decide whether to poll frequently (to process messages as soon as they arrive) or whether to always process them in time intervals (say every n seconds).

Another point to note would be that you could read upto 10 messages in a single receiveMessages request, so that would also reduce the number of calls you make to SQS, thereby reducing costs. And as the above blog explains in details, you may request to read 10 messages, but it may not return you 10 even if there are that many messages in the queue.

In general though, I would say you need to build appropriate hooks and exception handling to turn off the polling if you wish to at runtime, in case you are using a while(true) kind of a structure.

Another aspect to consider is whether you would like to poll SQS in your main application thread or you would like to spawn another thread. So another option could be to create a ScheduledThreadPoolExecutor with a single thread in the main to schedule a thread to poll the SQS periodically (every few seconds), and you may not need a while(true) structure.

Solution 2

There are a few things that you're missing:

  • Use the receiveMessages(ReceiveMessageRequest) and set a wait time to enable long polling.
  • Wrap your AWS calls in try/catch blocks. In particular, pay attention to OverLimitException, which can be thrown from receiveMessages() if you would have too many in-flight messages.
  • Wrap the entire body of the while loop in its own try/catch block, logging any exceptions that are caught (there shouldn't be -- this is here to ensure that your application doesn't crash because AWS changed their API or you neglected to handle an expected exception).

See doc for more information about long polling and possible exceptions.

As for using the async client: do you have any particular reason to use it? If not, then don't: a single receiver thread is much easier to manage.

Share:
29,924

Related videos on Youtube

Francisco Hanna
Author by

Francisco Hanna

All-terrain programmer. Trying to make a proffesion out of my passion <3

Updated on July 29, 2022

Comments

  • Francisco Hanna
    Francisco Hanna almost 2 years

    I have a simple class named QueueService with some methods that wrap the methods from the AWS SQS SDK for Java. For example:

    public ArrayList<Hashtable<String, String>> receiveMessages(String queueURL) {
            List<Message> messages = this.sqsClient.receiveMessage(queueURL).getMessages();
    
            ArrayList<Hashtable<String, String>> resultList = new ArrayList<Hashtable<String, String>>();
            for(Message message : messages) {
                Hashtable<String, String> resultItem = new Hashtable<String, String>();
                resultItem.put("MessageId", message.getMessageId());
                resultItem.put("ReceiptHandle", message.getReceiptHandle());
                resultItem.put("Body", message.getBody());
                resultList.add(resultItem);
            }
            return resultList;
        }
    

    I have another another class named App that has a main and creates an instace of the QueueService.

    I looking for a "pattern" to make the main in App to listen for new messages in the queue. Right now I have a while(true) loop where I call the receiveMessagesmethod:

    while(true) {
                messages = queueService.receiveMessages(queueURL); 
                for(Hashtable<String, String> message: messages) {
                    String receiptHandle = message.get("ReceiptHandle");
                    String messageBody = message.get("MessageBody");
                    System.out.println(messageBody);
                    queueService.deleteMessage(queueURL, receiptHandle);
                }
            }
    

    Is this the correct way? Should I use the async message receive method in SQS SDK?

  • Francisco Hanna
    Francisco Hanna over 6 years
    The reason to use the async client is because, in the future, an app (.jar) hosted in AWS Lambda will use the SQS service wrapper to check if there is new messages in the queue to process. Since Lambda charges by event firing or function call, I think it will be cheaper to have a listener (using sqs + jws) that executes only when there are new messages. Please, could you correct me if I'am wrong in this last point?
  • kdgregory
    kdgregory over 6 years
    The only thing that the async client does is wrap each non-async request in a Callable then run it on an internal thread pool. I'm not sure what the "SQS service wrapper" is, but SQS isn't currently a supported event source. Most people use SNS instead, some use Kinesis.
  • Francisco Hanna
    Francisco Hanna over 6 years
    With "SQS service wrapper" I refer to the class that I've created that wraps the some methods of the SQS SDK. I've found a way of setting a listener lo that triggers when a new message is in the queue. I should have dig deeper in the documentation.
  • Francisco Hanna
    Francisco Hanna over 6 years
    Thank you. I've read that post and it uses a time schedule with Cloudwatch that triggers a Lambda function that polls the queue. I was trying to avoid the time scheduler.
  • Chazt3n
    Chazt3n over 4 years
    this year SQS can trigger lambdas
  • pzrq
    pzrq almost 4 years
    This was true in 2017, though on 28 June 2018 it became possible to trigger a Lambda from SQS: cabbagetech.blog/2018/08/09/…
  • Adépòjù Olúwáségun
    Adépòjù Olúwáségun over 3 years
    @pzrq it is also worthy of note that you can not trigger a Lambda from SQS If you’re using SQS FIFO queues.
  • César
    César over 3 years
    Since nov 2019, there is also support to trigger a Lambda from SQS FIFO however they don't guarantee only once delivery aws.amazon.com/blogs/compute/…
  • Francisco Hanna
    Francisco Hanna over 2 years
    @Chazt3n If you want to add an answer I could accept it because SQS triggering a Lambda is what I needed at the moment :)