Spring-boot-starter RabbitMQ global error handling
Override Boot's listener container factory bean, as described in Enable Listener Endpoint Annotations.
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(myErrorHandler());
...
return factory;
}
You can inject a custom implementation of ErrorHandler
which will be added to each listener container the factory creates.
void handleError(Throwable t);
The throwable will be a ListenerExecutionFailedException
which, starting with version 1.6.7 (boot 1.4.4), has the raw inbound message in its failedMessage
property.
The default error handler considers causes such as MessageConversionException
to be fatal (they will not be requeued).
If you wish to retain that behavior (normal for such problems), you should throw an AmqpRejectAndDontRequeueException
after handling the error.
By the way, you don't need that RabbitTemplate
bean; if you have just one MessageConverter
bean in the application, boot will auto-wire it into the containers and template.
Since you will be overriding boot's factory, you will have to wire in the converter there.
EDIT
You could use the default ConditionalRejectingErrorHandler
, but inject it with a custom implementation of FatalExceptionStrategy
. In fact, you could subclass its DefaultExceptionStrategy
and override isFatal(Throwable t)
, then, after handing the error, return super.isFatal(t)
.
EDIT2
Full example; sends 1 good message and 1 bad one:
package com.example;
import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;
@SpringBootApplication
public class So42215050Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
context.getBean(So42215050Application.class).runDemo();
context.close();
}
@Autowired
private RabbitTemplate template;
private void runDemo() throws Exception {
this.template.convertAndSend(queue().getName(), new Foo("bar"));
this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
return new Message("some bad json".getBytes(), m.getMessageProperties());
});
Thread.sleep(5000);
}
@RabbitListener(queues = "So42215050")
public void handle(Foo in) {
System.out.println("Received: " + in);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}
@Bean
public Queue queue() {
return new Queue("So42215050", false, false, true);
}
@Bean
public MessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
}
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
logger.error("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
}
return super.isFatal(t);
}
}
public static class Foo {
private String foo;
public Foo() {
super();
}
public Foo(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Foo [foo=" + this.foo + "]";
}
}
}
Result:
Received: Foo [foo=bar]
2017-02-14 09:42:50.972 ERROR 44868 --- [cTaskExecutor-1] 5050Application$MyFatalExceptionStrategy : Failed to process inbound message from queue So42215050; failed message: (Body:'some bad json' MessageProperties [headers={TypeId=com.example.So42215050Application$Foo}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=So42215050, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-P2QqY0PMD1ppX5NnkUPhFA, consumerQueue=So42215050])
EDIT3
JSON does not convey any type information. By default, the type to convert to will be inferred from the method parameter type. If you wish to reject anything that can't be converted to that type, you need to configure the message converter appropriately.
For example:
@Bean
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
mapper.setDefaultType(Foo.class);
converter.setClassMapper(mapper);
return converter;
}
Now, when I change my example to send a Bar
instead of a Foo
...
public static class Bar {
...
}
and
this.template.convertAndSend(queue().getName(), new Bar("baz"));
I get...
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}]
But this only works if the sender sets the __TypeId__
header (which the template does if it's configured with the same adapter).
EDIT4
@SpringBootApplication
public class So42215050Application {
private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
context.getBean(So42215050Application.class).runDemo();
context.close();
}
@Autowired
private RabbitTemplate template;
private void runDemo() throws Exception {
this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type
this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json
});
Message message = MessageBuilder
.withBody("{\"foo\":\"bar\"}".getBytes())
.andProperties(
MessagePropertiesBuilder
.newInstance()
.setContentType("application/json")
.build())
.build();
this.template.send(queue().getName(), message); // Success - default Foo class when no header
message.getMessageProperties().setHeader("__TypeId__", "foo");
this.template.send(queue().getName(), message); // Success - foo is mapped to Foo
message.getMessageProperties().setHeader("__TypeId__", "bar");
this.template.send(queue().getName(), message); // fail - mapped to a Map
Thread.sleep(5000);
}
@RabbitListener(queues = "So42215050")
public void handle(Foo in) {
logger.info("Received: " + in);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}
@Bean
public Queue queue() {
return new Queue("So42215050", false, false, true);
}
@Bean
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
mapper.setDefaultType(Foo.class);
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("foo", Foo.class);
mappings.put("bar", Object.class);
mapper.setIdClassMapping(mappings);
converter.setClassMapper(mapper);
return converter;
}
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
logger.error("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
}
return super.isFatal(t);
}
}
public static class Foo {
private String foo;
public Foo() {
super();
}
public Foo(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Foo [foo=" + this.foo + "]";
}
}
public static class Bar {
private String foo;
public Bar() {
super();
}
public Bar(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Bar [foo=" + this.foo + "]";
}
}
}
VelNaga
I'm here as a lifelong learner , I thank all the members the StackExchange's community for the great help that comes to me and I try in my turn to give my little help as I can. ......I love to learn,help others etc etc
Updated on June 24, 2022Comments
-
VelNaga almost 2 years
I am using spring-boot-starter-amqp 1.4.2.Producer and consumer working fine but sometimes the incoming JSON messages have an incorrect syntax. This results in the following (correct) exception:
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token...
In future i may face lot more exceptions. So i want to configure a global error handler so that if there is any exception in any one the consumer i can handle it globally.
Note : In this case message is not at all reached consumer. I want to handle these kind of exceptions globally across the consumer.
Please find the below code :
RabbitConfiguration.java
@Configuration @EnableRabbit public class RabbitMqConfiguration { @Autowired private CachingConnectionFactory cachingConnectionFactory; @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } @Bean @Primary public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory); template.setMessageConverter(jsonMessageConverter()); return template; } }
Consumer
@RabbitListener( id = "book_queue", bindings = @QueueBinding( value = @Queue(value = "book.queue", durable = "true"), exchange = @Exchange(value = "book.exchange", durable = "true", delayed = "true"), key = "book.queue" ) ) public void handle(Message message) { //Business Logic }
Could anyone please assist me to handle the error handler globally.Your help should be appreciable.
Updated question as per Gary comment
I can able to run your example and getting the expected output as you said, I just want to try few more negative cases based on your example, but i couldn't understand few things,
this.template.convertAndSend(queue().getName(), new Foo("bar"));
output
Received: Foo [foo=bar]
The above code is working fine.Now instead of "Foo" i am sending some other bean
this.template.convertAndSend(queue().getName(), new Differ("snack","Hihi","how are you"));
output
Received: Foo [foo=null]
The consumer shouldn't accept this message because it is completely a different bean(Differ.class not Foo.class) so i am expecting it should go to "ConditionalRejectingErrorHandler".Why it is accepting wrong payload and printing as null ? Please correct me if i am wrong.
Edit 1 :
Gary, As you said i have set the header "TypeId" while sending the message but still consumer can able to convert wrong messages and it is not throwing any error...please find the code below, I have used your code samples and just did the following modifications,
1) Added "__TypeId__" while sending the message,
this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"),m -> { m.getMessageProperties().setHeader("__TypeId__","foo"); return m; });
2) Added "DefaultClassMapper" in the "Jackson2JsonMessageConverter"
@Bean public MessageConverter jsonConverter() { Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); DefaultClassMapper mapper = new DefaultClassMapper(); mapper.setDefaultType(Foo.class); converter.setClassMapper(mapper); return new Jackson2JsonMessageConverter(); }
-
VelNaga about 7 yearsThanks a lot for your response.Need few more clarification which i don't understand from your answer.1) To implement global error handler we should have a bean "SimpleRabbitListenerContainerFactory"?(No other way) 2) I can see "ErrorHandler" as a method.Is it possible to define a bean as a "ErrorHandler"? Could you please share the code sample for effective way of writing ErrorHandler?at least a hint or link. 3) You have mentioned, Having one "MessageConverter" is redundant and boot will auto-wire it to containers. For me spring-boot is not automatically doing that am i missing anything?
-
Gary Russell about 7 yearsThis now available in spring-amqp-samples as
spring-rabbit-global-errorhandler
. -
VelNaga about 7 yearsThanks a lot. Let me give a try once it is work I'll accept the answer immediately
-
VelNaga about 7 yearsSorry for the very late reply...Your example is working fine but If i pass "this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"));", Rabbit is accepting this payload but it should reject because my consumer expects "Foo" but i am passing "Differ" which is different class and different properties, may i know why it is not coming to "ConditionalRejectingErrorHandler" ?
-
Gary Russell about 7 yearsDon't put code in comments, it's unreadable - edit your question instead. It's not clear what you are asking; you can't get an error on the template.send because the sending side doesn't know what the consumer wants. If you mean you are not seeing it on the consumer side either, attach a DEBUG log for the delivery.
-
VelNaga about 7 yearsI have updated my question now it will be clear.I am not expecting the error from template.send but i am expecting it should go to ConditionalRejectingErrorHandler since it is wrong payload.
-
Gary Russell about 7 yearsThat's because the destination type is inferred from the method parameter. JSON doesn't implicitly convey type information. See my next edit for how to configure the converter to only convert to a specific type.
-
VelNaga about 7 yearsThanks a lot for your answer. So we need to set the typeId information from sender also you are configuring in the converter which is global for all the consumers in a components suppose if my components contains multiple consumers and each consumer might consume different payloads that time this solution will not work. is there any way to configure to this type information in @RabbitListener annotation or while declaring the queue ?
-
Gary Russell about 7 yearsWIth JSON, you have to tell the converter what to convert the json to. We try to help (e.g. by inferring the type from the method or by the sender sending type information in headers), but you can always configure your own converter. If you need multiple types, then each
@RabbitListener
has to be created by a differentSimpleRabbitListenerContainerFactory
bean (each of which can be configured to inject a different converter). You can specify which factory to use with thecontainerFactory
attribute on@RabbitListener
. -
Gary Russell about 7 yearsAlternatively, you can access the containers (by id) from the
RabbitListenerEndpointRegistry
bean; and change the converter in each container before starting them. -
VelNaga about 7 yearsAgain thanks a lot for your reply. Let me tweak more about this. Meanwhile with "ConditionRejectingErrorHandler" apart from invalid payload are we able to catch any other exception? Also could you please share a link for configuring Dead letter exchange for ConditionRejectingErrorHandler" ?
-
VelNaga about 7 yearsI have set the "TypeId" header while sending the message and configured DefaultClassMapper in the receiving side, but still my consumers consumes the wrong payload. Am i setting the "TypeId" properly ? Please guide me to resolve this issue
-
Gary Russell about 7 yearsWhen I set it to some bogus class name, I get
Caused by: java.lang.ClassNotFoundException: foo
. If I addmapper.setIdClassMapping(Collections.singletonMap("foo", Object.class));
to the class mapper, I getCaused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.example.So42215050Application$Foo] for GenericMessage [payload={foo=baz},...
-
VelNaga about 7 years@Gary...do we need to set "TypeId" header in sender ? also do we need to set "DefaultType" property in DefaultClassMapper ?
-
VelNaga about 7 years@Gary..unfortunately It's not working for me..I set "idClassMapping", but still my consumer receives the message and print it as null
-
Gary Russell about 7 yearsAll I can do is repost my sample that shows it working as I describe. See EDIT4.
-
VelNaga about 7 years@Gary...Sorry it's my mistake instead of returning modified Jackson2JsonMessageConverter i have returned new instance of Jackson2JsonMessageConverter that's why it is not throwing any exception now it is working flawlessly.
-
Admin about 7 years@GaryRussell In Edit 2 - Why are you create bean factory ?
-
Gary Russell about 7 yearsSpring Boot creates it in
ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
-
Paramesh Korrakuti almost 7 years
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
, its giving a compilation error, sinceDefaultExceptionStrategy
is a private inner class. How to solve the same? -
Admin almost 6 yearshow can I hide the exception which used for message requeued! stackoverflow.com/questions/50350377/…