AWS SQS Java. Not all messages are retrieved from the SQS queue

11,377

Solution 1

When receiving messages from an SQS queue, you need to repeatedly call sqs:ReceiveMessage.

On each call to sqs:ReceiveMessage, you will get 0 or more messages from the queue which you'll need to iterate through. For each message, you'll also need to call sqs:DeleteMessage to remove the message from the queue when you're done processing each message.

Add a loop around your "Long Polling" sample above to receive all messages.

for (;;) {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    for (Message message : messages) {
        System.out.println(" Message");
        System.out.println(" MessageId: " + message.getMessageId());
        System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
        System.out.println(" MD5OfBody: " + message.getMD5OfBody());
        System.out.println(" Body: " + message.getBody());
        for (Entry<String, String> entry : message.getAttributes().entrySet()) {
            System.out.println(" Attribute");
            System.out.println(" Name: " + entry.getKey());
            System.out.println(" Value: " + entry.getValue());
        }
    }
    System.out.println();
}

Also note that you may receive the same message more than once. So allow your work to "reprocess" the same message, or detect a repeated message.

Solution 2

I too was facing same issue - only one message was getting returned , then i tried receiveMessageRequest.setMaxNumberOfMessages(10) , which would help me in retrieving 10 messages in a loop,

since my queue has >500 records what i did was

    List<String> messagelist = new ArrayList<>();

    try
    {
        AmazonSQS sqs = new AmazonSQSClient(credentials);
        Region usWest2 = Region.getRegion(Regions.US_WEST_2);
        sqs.setRegion(usWest2);
        boolean flag = true;

        while(flag)
        {
            ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queuename);
            receiveMessageRequest.setMaxNumberOfMessages(number_of_message_);
            receiveMessageRequest.withMaxNumberOfMessages(number_of_message_).withWaitTimeSeconds(wait_time_second_);
            List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();

            for (Message message : messages) 
                  {
                    //   System.out.println("    Body:          " + message.getBody());
                       messagelist.add( message.getBody());

                       String messageReceiptHandle = message.getReceiptHandle();
                       sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queuename).withReceiptHandle(messageReceiptHandle));
                    }
            if(messages.size()==0)
            {
                flag = false;
            }
        }         

    }
    catch (AmazonServiceException ase) {
        ase.printStackTrace();
    } catch (AmazonClientException ace) {
       ace.printStackTrace();
    }
    finally {
        return messagelist ;
    }

I am reading records from SQS then saving it into a String list and then deletion the record from queue.

so in the end i will have all the data from the queue in a list

Solution 3

An SQS queue is not a database. You can't read all the messages into a list like you are trying to do. There is no beginning and no end to the queue. You poll the queue and ask for some messages, it returns you some messages if they exist.

If you want a method that can return the entire dataset, then sqs is not the right tool - a traditional database might be better in that case.

Solution 4

Long polling will wait if there is no message in Queue. This means that if you call ReceiveMessage with long polling in loop you are guaranteed that you will get all messages. When there is 0 messages received in response, you've already received all messages.

You mentioned that you used also web console. Web console works in same way as calling API with SDK. This means that when you receive and see messages in console, messages are invisible to other clients until visibility timeout expires. That's probably reason why you don't see messages.

See more information about visibility timeout:

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html

Share:
11,377
lk7777
Author by

lk7777

Updated on July 07, 2022

Comments

  • lk7777
    lk7777 almost 2 years

    I have been trying several approaches to retrieve all messages from the SQS queue by using AWS SDK for Java to no avail. I have read about the distributed nature of the AWS SQS and that messages are stored on the different servers. But what I do not understand is why this architecture is not hidden from the end user. What tricks do I have to apply in Java code to retrieve all messages and be 100% sure that no one was missed?

    I tried this with the "Long Polling":

    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    for (Message message : messages) {
    System.out.println(" Message");
    System.out.println(" MessageId: " + message.getMessageId());
    System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
    System.out.println(" MD5OfBody: " + message.getMD5OfBody());
    System.out.println(" Body: " + message.getBody());
    for (Entry<String, String> entry : message.getAttributes().entrySet()) {
    System.out.println(" Attribute");
    System.out.println(" Name: " + entry.getKey());
    System.out.println(" Value: " + entry.getValue());
    }
    }
    System.out.println();
    

    And this with Request Batching / Client-Side Buffering:

        // Create the basic Amazon SQS async client
        AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
    
        // Create the buffered client
        AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
    
        CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");
    
        CreateQueueResult res = bufferedSqs.createQueue(createRequest);
    
        SendMessageRequest request = new SendMessageRequest();
        String body = "test message_" + System.currentTimeMillis();
        request.setMessageBody( body );
        request.setQueueUrl(res.getQueueUrl());
    
        SendMessageResult sendResult = bufferedSqs.sendMessage(request);
    
        ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
        .withMaxNumberOfMessages(10)
        .withQueueUrl(queueUrl);
        ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
    
        List<Message> messages = rx.getMessages();
        for (Message message : messages) {
        System.out.println(" Message");
        System.out.println(" MessageId: " + message.getMessageId());
        System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
        System.out.println(" MD5OfBody: " + message.getMD5OfBody());
        System.out.println(" Body: " + message.getBody());
        for (Entry<String, String> entry : message.getAttributes().entrySet()) {
        System.out.println(" Attribute");
        System.out.println(" Name: " + entry.getKey());
        System.out.println(" Value: " + entry.getValue());
        }
        }
    

    But I am still unable to retrieve all messages.

    Any idea?

    AWS Forum keeps silence on my post.

  • lk7777
    lk7777 almost 9 years
    List<Message> is a collection of messages in the SQS queue. for (Message ....) parses this collection At some particular moment it is supposed to list all messages in the queue. I realize that a loop is necessary to poll the queue (this is only a fragment of code), but it wasn't my question. My question is why this collection doesn't contain all messages that I see in the AWS console?
  • Matt Houser
    Matt Houser almost 9 years
    List<Message> is a collection of messages returned from the queue. It does not contain all messages from the queue. When you call sqs.receiveMessage(), it goes to the queue and pulls down some messages, but not necessarily all messages. getMessages() then returns to you the collection of messages received from the last call to receiveMessage. Once you've exhausted those messages, you need to call sqs.receiveMessage() again to get more.
  • Matt Houser
    Matt Houser almost 9 years
    Long polling will wait a maximum of the queue-configured timeout or the request-configured timeout. The maximum you can set this timeout is 20 seconds. So even if long polling, the request will return with an empty list if nothing arrives within 20 seconds.
  • E.J. Brennan
    E.J. Brennan almost 9 years
    "At some particular moment it is supposed to list all messages in the queue" - that is a false statement. That is not how SQS works.
  • lk7777
    lk7777 almost 9 years
    @Matt Houser. Thank you Matt for clarification. Now I am able to receive and parse all S3 (put notification) -SNS-SQS JSON messages.
  • Andy Dufresne
    Andy Dufresne over 5 years
    What should be the wait time configured for long polling? How do we know that the set time interval is enough for SQS to guarantee that there are no more messages?
  • Dave Moten
    Dave Moten almost 3 years
    This approach does not work reliably if you don't delete messages.
  • johncurrier
    johncurrier over 2 years
    The max for long polling is 20 seconds.
  • johncurrier
    johncurrier over 2 years
    Note that you don't have to create a new ReceiveMessageRequest each iteration (you can reuse it). Also note that your for loop might never exit if the queue is being populated frequently enough or faster than you're reading it.