Spring-boot-starter RabbitMQ global error handling

16,580

Override Boot's listener container factory bean, as described in Enable Listener Endpoint Annotations.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setErrorHandler(myErrorHandler());
    ...
    return factory;
}

You can inject a custom implementation of ErrorHandler which will be added to each listener container the factory creates.

void handleError(Throwable t);

The throwable will be a ListenerExecutionFailedException which, starting with version 1.6.7 (boot 1.4.4), has the raw inbound message in its failedMessage property.

The default error handler considers causes such as MessageConversionException to be fatal (they will not be requeued).

If you wish to retain that behavior (normal for such problems), you should throw an AmqpRejectAndDontRequeueException after handling the error.

By the way, you don't need that RabbitTemplate bean; if you have just one MessageConverter bean in the application, boot will auto-wire it into the containers and template.

Since you will be overriding boot's factory, you will have to wire in the converter there.

EDIT

You could use the default ConditionalRejectingErrorHandler, but inject it with a custom implementation of FatalExceptionStrategy. In fact, you could subclass its DefaultExceptionStrategy and override isFatal(Throwable t), then, after handing the error, return super.isFatal(t).

EDIT2

Full example; sends 1 good message and 1 bad one:

package com.example;

import org.slf4j.Logger;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;

@SpringBootApplication
public class So42215050Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
        context.getBean(So42215050Application.class).runDemo();
        context.close();
    }

    @Autowired
    private RabbitTemplate template;

    private void runDemo() throws Exception {
        this.template.convertAndSend(queue().getName(), new Foo("bar"));
        this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
            return new Message("some bad json".getBytes(), m.getMessageProperties());
        });
        Thread.sleep(5000);
    }

    @RabbitListener(queues = "So42215050")
    public void handle(Foo in) {
        System.out.println("Received: " + in);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    @Bean
    public Queue queue() {
        return new Queue("So42215050", false, false, true);
    }

    @Bean
    public MessageConverter jsonConverter() {
        return new Jackson2JsonMessageConverter();
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }

    public static class Foo {

        private String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }
}

Result:

Received: Foo [foo=bar]

2017-02-14 09:42:50.972 ERROR 44868 --- [cTaskExecutor-1] 5050Application$MyFatalExceptionStrategy : Failed to process inbound message from queue So42215050; failed message: (Body:'some bad json' MessageProperties [headers={TypeId=com.example.So42215050Application$Foo}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=So42215050, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-P2QqY0PMD1ppX5NnkUPhFA, consumerQueue=So42215050])

EDIT3

JSON does not convey any type information. By default, the type to convert to will be inferred from the method parameter type. If you wish to reject anything that can't be converted to that type, you need to configure the message converter appropriately.

For example:

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return converter;
}

Now, when I change my example to send a Bar instead of a Foo...

public static class Bar {

   ...

}

and

this.template.convertAndSend(queue().getName(), new Bar("baz"));

I get...

Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}]

But this only works if the sender sets the __TypeId__ header (which the template does if it's configured with the same adapter).

EDIT4

@SpringBootApplication
public class So42215050Application {

    private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
        context.getBean(So42215050Application.class).runDemo();
        context.close();
    }

    @Autowired
    private RabbitTemplate template;

    private void runDemo() throws Exception {
        this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type
        this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
            return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json
        });
        Message message = MessageBuilder
                .withBody("{\"foo\":\"bar\"}".getBytes())
                .andProperties(
                        MessagePropertiesBuilder
                            .newInstance()
                            .setContentType("application/json")
                            .build())
                .build();
        this.template.send(queue().getName(), message); // Success - default Foo class when no header
        message.getMessageProperties().setHeader("__TypeId__", "foo");
        this.template.send(queue().getName(), message); // Success - foo is mapped to Foo
        message.getMessageProperties().setHeader("__TypeId__", "bar");
        this.template.send(queue().getName(), message); // fail - mapped to a Map
        Thread.sleep(5000);
    }

    @RabbitListener(queues = "So42215050")
    public void handle(Foo in) {
        logger.info("Received: " + in);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    @Bean
    public Queue queue() {
        return new Queue("So42215050", false, false, true);
    }

    @Bean
    public MessageConverter jsonConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        DefaultClassMapper mapper = new DefaultClassMapper();
        mapper.setDefaultType(Foo.class);
        Map<String, Class<?>> mappings = new HashMap<>();
        mappings.put("foo", Foo.class);
        mappings.put("bar", Object.class);
        mapper.setIdClassMapping(mappings);
        converter.setClassMapper(mapper);
        return converter;
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }

    public static class Foo {

        private String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }

    public static class Bar {

        private String foo;

        public Bar() {
            super();
        }

        public Bar(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Bar [foo=" + this.foo + "]";
        }

    }

}
Share:
16,580
VelNaga
Author by

VelNaga

I'm here as a lifelong learner , I thank all the members the StackExchange's community for the great help that comes to me and I try in my turn to give my little help as I can. ......I love to learn,help others etc etc

Updated on June 24, 2022

Comments

  • VelNaga
    VelNaga almost 2 years

    I am using spring-boot-starter-amqp 1.4.2.Producer and consumer working fine but sometimes the incoming JSON messages have an incorrect syntax. This results in the following (correct) exception:

    org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    Caused by:  org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
    Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token...
    

    In future i may face lot more exceptions. So i want to configure a global error handler so that if there is any exception in any one the consumer i can handle it globally.

    Note : In this case message is not at all reached consumer. I want to handle these kind of exceptions globally across the consumer.

    Please find the below code :

    RabbitConfiguration.java

    @Configuration
    @EnableRabbit
    public class RabbitMqConfiguration {
    
        @Autowired
        private CachingConnectionFactory cachingConnectionFactory;
    
        @Bean
        public MessageConverter jsonMessageConverter()
        {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        @Primary
        public RabbitTemplate rabbitTemplate()
        {
            RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
            template.setMessageConverter(jsonMessageConverter());
            return template;
        }
    
    }
    

    Consumer

    @RabbitListener(
            id = "book_queue",
            bindings = @QueueBinding(
                    value = @Queue(value = "book.queue", durable = "true"),
                    exchange = @Exchange(value = "book.exchange", durable = "true", delayed = "true"),
                    key = "book.queue"
            )
        )
    public void handle(Message message) {
    //Business Logic
    }
    

    Could anyone please assist me to handle the error handler globally.Your help should be appreciable.

    Updated question as per Gary comment

    I can able to run your example and getting the expected output as you said, I just want to try few more negative cases based on your example, but i couldn't understand few things,

    this.template.convertAndSend(queue().getName(), new Foo("bar"));
    

    output

    Received: Foo [foo=bar]

    The above code is working fine.Now instead of "Foo" i am sending some other bean

    this.template.convertAndSend(queue().getName(), new Differ("snack","Hihi","how are you"));
    

    output

    Received: Foo [foo=null]

    The consumer shouldn't accept this message because it is completely a different bean(Differ.class not Foo.class) so i am expecting it should go to "ConditionalRejectingErrorHandler".Why it is accepting wrong payload and printing as null ? Please correct me if i am wrong.

    Edit 1 :

    Gary, As you said i have set the header "TypeId" while sending the message but still consumer can able to convert wrong messages and it is not throwing any error...please find the code below, I have used your code samples and just did the following modifications,

    1) Added "__TypeId__" while sending the message,

    this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"),m -> {
            m.getMessageProperties().setHeader("__TypeId__","foo");
            return m;
        }); 
    

    2) Added "DefaultClassMapper" in the "Jackson2JsonMessageConverter"

    @Bean
    public MessageConverter jsonConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        DefaultClassMapper mapper = new DefaultClassMapper();
        mapper.setDefaultType(Foo.class);
        converter.setClassMapper(mapper);
        return new Jackson2JsonMessageConverter();
    }    
    
  • VelNaga
    VelNaga about 7 years
    Thanks a lot for your response.Need few more clarification which i don't understand from your answer.1) To implement global error handler we should have a bean "SimpleRabbitListenerContainerFactory"?(No other way) 2) I can see "ErrorHandler" as a method.Is it possible to define a bean as a "ErrorHandler"? Could you please share the code sample for effective way of writing ErrorHandler?at least a hint or link. 3) You have mentioned, Having one "MessageConverter" is redundant and boot will auto-wire it to containers. For me spring-boot is not automatically doing that am i missing anything?
  • Gary Russell
    Gary Russell about 7 years
    This now available in spring-amqp-samples as spring-rabbit-global-errorhandler.
  • VelNaga
    VelNaga about 7 years
    Thanks a lot. Let me give a try once it is work I'll accept the answer immediately
  • VelNaga
    VelNaga about 7 years
    Sorry for the very late reply...Your example is working fine but If i pass "this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"));", Rabbit is accepting this payload but it should reject because my consumer expects "Foo" but i am passing "Differ" which is different class and different properties, may i know why it is not coming to "ConditionalRejectingErrorHandler" ?
  • Gary Russell
    Gary Russell about 7 years
    Don't put code in comments, it's unreadable - edit your question instead. It's not clear what you are asking; you can't get an error on the template.send because the sending side doesn't know what the consumer wants. If you mean you are not seeing it on the consumer side either, attach a DEBUG log for the delivery.
  • VelNaga
    VelNaga about 7 years
    I have updated my question now it will be clear.I am not expecting the error from template.send but i am expecting it should go to ConditionalRejectingErrorHandler since it is wrong payload.
  • Gary Russell
    Gary Russell about 7 years
    That's because the destination type is inferred from the method parameter. JSON doesn't implicitly convey type information. See my next edit for how to configure the converter to only convert to a specific type.
  • VelNaga
    VelNaga about 7 years
    Thanks a lot for your answer. So we need to set the typeId information from sender also you are configuring in the converter which is global for all the consumers in a components suppose if my components contains multiple consumers and each consumer might consume different payloads that time this solution will not work. is there any way to configure to this type information in @RabbitListener annotation or while declaring the queue ?
  • Gary Russell
    Gary Russell about 7 years
    WIth JSON, you have to tell the converter what to convert the json to. We try to help (e.g. by inferring the type from the method or by the sender sending type information in headers), but you can always configure your own converter. If you need multiple types, then each @RabbitListener has to be created by a different SimpleRabbitListenerContainerFactory bean (each of which can be configured to inject a different converter). You can specify which factory to use with the containerFactory attribute on @RabbitListener.
  • Gary Russell
    Gary Russell about 7 years
    Alternatively, you can access the containers (by id) from the RabbitListenerEndpointRegistry bean; and change the converter in each container before starting them.
  • VelNaga
    VelNaga about 7 years
    Again thanks a lot for your reply. Let me tweak more about this. Meanwhile with "ConditionRejectingErrorHandler" apart from invalid payload are we able to catch any other exception? Also could you please share a link for configuring Dead letter exchange for ConditionRejectingErrorHandler" ?
  • VelNaga
    VelNaga about 7 years
    I have set the "TypeId" header while sending the message and configured DefaultClassMapper in the receiving side, but still my consumers consumes the wrong payload. Am i setting the "TypeId" properly ? Please guide me to resolve this issue
  • Gary Russell
    Gary Russell about 7 years
    When I set it to some bogus class name, I get Caused by: java.lang.ClassNotFoundException: foo. If I add mapper.setIdClassMapping(Collections.singletonMap("foo", Object.class)); to the class mapper, I get Caused by: org.springframework.messaging.converter.MessageConversionExc‌​eption: Cannot convert from [java.util.LinkedHashMap] to [com.example.So42215050Application$Foo] for GenericMessage [payload={foo=baz},...
  • VelNaga
    VelNaga about 7 years
    @Gary...do we need to set "TypeId" header in sender ? also do we need to set "DefaultType" property in DefaultClassMapper ?
  • VelNaga
    VelNaga about 7 years
    @Gary..unfortunately It's not working for me..I set "idClassMapping", but still my consumer receives the message and print it as null
  • Gary Russell
    Gary Russell about 7 years
    All I can do is repost my sample that shows it working as I describe. See EDIT4.
  • VelNaga
    VelNaga about 7 years
    @Gary...Sorry it's my mistake instead of returning modified Jackson2JsonMessageConverter i have returned new instance of Jackson2JsonMessageConverter that's why it is not throwing any exception now it is working flawlessly.
  • Admin
    Admin about 7 years
    @GaryRussell In Edit 2 - Why are you create bean factory ?
  • Gary Russell
    Gary Russell about 7 years
    Spring Boot creates it in ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
  • Paramesh Korrakuti
    Paramesh Korrakuti almost 7 years
    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {, its giving a compilation error, since DefaultExceptionStrategy is a private inner class. How to solve the same?
  • Admin
    Admin almost 6 years
    how can I hide the exception which used for message requeued! stackoverflow.com/questions/50350377/…