How to consume one message?

14,227

Solution 1

You have to declare basicQos setting to get one message at a time from ACK to NACK status and disable auto ACK to give acknowledgement explicitly.

ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.basicQos(1);
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    System.out.println("[*] waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, consumer);
    while(true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        int n = channel.queueDeclarePassive(QUEUE_NAME).getMessageCount();
        System.out.println(n);
        if(delivery != null) {
            byte[] bs = delivery.getBody();
            System.out.println(new String(bs));
            //String message= new String(delivery.getBody());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            //System.out.println("[x] Received '"+message);
        }
    }

Hope it helps!

Solution 2

Use AMQP 0.9.1 basic.get to synchronously get just one message.

ConnectionFactory factory = new ConnectionFactory();
factory.setUri(uri);

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

GetResponse response = channel.basicGet(QUEUE_NAME, true);
if (response != null) {
    String message = new String(response.getBody(), "UTF-8");
}

channel.close();
connection.close();
Share:
14,227
jBee
Author by

jBee

Updated on June 14, 2022

Comments

  • jBee
    jBee almost 2 years

    With example in rabbitmq, consumer get all messages from queue at one time. How to consume one message and exit?

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);
    
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received '" + message + "'");
    }
    
  • Nav
    Nav almost 8 years
    Also, as a word of caution to the OP: RabbitMQ team does not recommend reading only one at a time. Reason: rabbitmq.com/blog/2011/09/24/sizing-your-rabbits
  • Nav
    Nav over 7 years
    I don't think setting BasicQOS to 1 solves it. "This is done by setting a "prefetch count" value using the basic.qos method. The value defines the max number of unacknowledged deliveries that are permitted on a channel. Once the number reaches the configured count, RabbitMQ will stop delivering more messages on the channel unless at least one of the outstanding ones is acknowledged. ". From rabbitmq.com/confirms.html
  • Nyamiou The Galeanthrope
    Nyamiou The Galeanthrope about 4 years
    That isn't Java, the question was tagged as Java.
  • user3681304
    user3681304 over 3 years
    Perfect, exactly what I was looking for for a quick integration test.