Spring RabbitMQ - using manual channel acknowledgement on a service with @RabbitListener configuration

38,426

Solution 1

Add the Channel to the @RabbitListener method...

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    ...
}

and use the tag in the basicAck, basicReject.

EDIT

@SpringBootApplication
@EnableRabbit
public class So38728668Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
        context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
        context.close();
    }

    @Bean
    public Queue so38728668() {
        return new Queue("so38728668");
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    public static class Listener {

        private final CountDownLatch latch = new CountDownLatch(1);

        @RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {
            System.out.println(payload);
            channel.basicAck(tag, false);
            latch.countDown();
        }

    }

}

application.properties:

spring.rabbitmq.listener.acknowledge-mode=manual

Solution 2

Just in case you need to use #onMessage() from ChannelAwareMessageListener class. Then you can do it this way.

@Component
public class MyMessageListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) {
        log.info("Message received.");
        // do something with the message
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

And for the rabbitConfiguration

@Configuration
public class RabbitConfig {

    public static final String topicExchangeName = "exchange1";

    public static final String queueName = "queue1";

    public static final String routingKey = "queue1.route.#";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("xxxx");
        connectionFactory.setPassword("xxxxxxxxxx");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vHost1");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }


    @Bean
    public SimpleMessageListenerContainer listenerContainer(MyMessageListener myRabbitMessageListener) {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory());
        listenerContainer.setQueueNames(queueName);
        listenerContainer.setMessageListener(myRabbitMessageListener);
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        listenerContainer.setConcurrency("4");
        listenerContainer.setPrefetchCount(20);
        return listenerContainer;
    }
}
Share:
38,426
Guru
Author by

Guru

What ?

Updated on September 29, 2021

Comments

  • Guru
    Guru over 2 years

    How to acknowledge the messages manually without using auto acknowledgement. Is there a way to use this along with the @RabbitListener and @EnableRabbit style of configuration. Most of the documentation tells us to use SimpleMessageListenerContainer along with ChannelAwareMessageListener. However using that we lose the flexibility that is provided with the annotations. I have configured my service as below :

    @Service
    public class EventReceiver {
    
    @Autowired
    private MessageSender messageSender;
    
    @RabbitListener(queues = "${eventqueue}")
    public void receiveMessage(Order order) throws Exception {
    
      // code for processing order
    }
    

    My RabbitConfiguration is as below

    @EnableRabbit
    public class RabbitApplication implements RabbitListenerConfigurer {
    
    public static void main(String[] args) {
        SpringApplication.run(RabbitApplication.class, args);
    }
    
    @Bean
    
    
    public MappingJackson2MessageConverter jackson2Converter() {
            MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
            return converter;
        @Bean
    public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
          SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
          factory.setConnectionFactory(rabbitConnectionFactory());
          factory.setMaxConcurrentConsumers(5);
          factory.setMessageConverter((MessageConverter) jackson2Converter());
          factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
          return factory;
        }
    
    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        return connectionFactory;
    }
    
    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setContainerFactory(myRabbitListenerContainerFactory());
    }
    
    @Autowired
    private EventReceiver receiver;
    }
    }
    

    Any help will be appreciated on how to adapt manual channel acknowledgement along with the above style of configuration. If we implement the ChannelAwareMessageListener then the onMessage signature will change. Can we implement ChannelAwareMessageListener on a service ?