How to configure and receiveAndConvert jSON payload into domain Object in Spring Boot and RabbitMQ

17,793

Solution 1

I had the same problem and after some research and testing I learned, that there is more than one way to configure your RabbitMQ-Receiver in SpringBoot, but it is important to choose one and stick with that.

If you decide to go with the Annotation Driven Listener Endpoint, what I derive from your usage of @EnableRabbit and @RabbitListener, than the configuration you posted didn´t work for me. What worked is the following:

Derive your Configuration Class from org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer and override the Method configureRabbitListeners as follows:

 @Override
public void configureRabbitListeners(
        RabbitListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

and add a MessageHandlerFactory:

@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(new MappingJackson2MessageConverter());
    return factory;
}

Additionally you need to define SimpleRabbitListenerContainerFactory (as you already did) and Autowire the corresponding ConnectionFactory:

@Autowired
public ConnectionFactory connectionFactory;

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(10);
    return factory;
}

Finishing your Configuration, you need to define the Bean, which handles your Messages and inherits the @RabbitListerner-Annotations. For me I named that EventResultHandler (you named it TestService):

    @Bean
public EventResultHandler eventResultHandler() {
    return new EventResultHandler();
}

Then in your EventResultHandler (or TestService) you define the @RabbitListener-Methods with their corresponding Queues and the Payload (= the POJO, where your JSON-Message is serialized to):

@Component
public class EventResultHandler {

    @RabbitListener(queues=Queues.QUEUE_NAME_PRESENTATION_SERVICE)
    public void handleMessage(@Payload Event event) {
        System.out.println("Event received");
        System.out.println("EventType: " + event.getType().getText());
    }
}

I ommited the needed definition and binding of Queues and Exchanges - you can do that either in one or in another Microservice - or in RabbitMQ-Server manually... But you for sure have to do that.

Solution 2

Create a jackson message converter and set it with MessageListenerAdapter#setMessageConverter

@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

Where do come from MessageListenerAdapter ?

Share:
17,793
OchiengOlanga
Author by

OchiengOlanga

Results-driven, goal oriented software engineer bringing 7+ years’ wealth of experience in development, system design and architecture, management and passion. Main area of expertise, interest and specialization being cloud native applications that are responsive, elastic, distributed, message-driven and fault-tolerant. Tools of trade: Java, Spring, Spring Boot, Spring Cloud, Scala, Akka, Akka HTTP, Akka Cluster, GCP, Docker, Kubernetes. In addition, I am an excellent communicator and team player having had the opportunity to lead and work in teams on projects. During my free time, I co-organize Nairobi's largest JVM meetup group of enthusiasts - https://www.meetup.com/nairobi-jvm/

Updated on June 05, 2022

Comments

  • OchiengOlanga
    OchiengOlanga almost 2 years

    Recently I have been having a keen interest on Microservice Architecture using Spring Boot. My implementation has two Spring boot applications;

    Application One receives requests from a RESTful API, converts and sends jSON payload to a RabbitMQ queueA.

    Application Two, has subscribed to queueA, receives the jSON payload(Domain Object User) and is supposed to activate a service within Application Two eg. send email to a user.

    Using no XML in my Application Two configuration, how do I configure a converter that will convert the jSON payload received from RabbitMQ into a Domain Object User.

    Below are snippets from Spring Boot configurations on Application Two

    Application.class

    @SpringBootApplication
    @EnableRabbit
    public class ApplicationInitializer implements CommandLineRunner {
    
        final static String queueName = "user-registration";
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Autowired
        AnnotationConfigApplicationContext context;
    
        @Bean
        Queue queue() {
            return new Queue(queueName, false);
        }
    
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange("user-registrations");
        }
    
        @Bean
        Binding binding(Queue queue, TopicExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(queueName);
        }
    
        @Bean
        SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(queueName);
            container.setMessageListener(listenerAdapter);
            return container;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(ApplicationInitializer.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            System.out.println("Waiting for messages...");
        }
    
    }
    

    TestService.java

    @Component
    public class TestService {
    
        /**
         * This test verifies whether this consumer receives message off the user-registration queue
         */
        @RabbitListener(queues = "user-registration")
        public void testReceiveNewUserNotificationMessage(User user) {
            // do something like, convert payload to domain object user and send email to this user
        }
    
    }
    
  • OchiengOlanga
    OchiengOlanga over 8 years
    You sir.....deserve a medal. For some funny reason after updating my project parent to Spring Boot to 1.3.0.RELEASE few days ago my previous implementation broke. Thank you
  • afaulconbridge
    afaulconbridge almost 8 years
    Make sure that you are importing the right class in order to get this to work - you need a org.springframework.amqp.support.converter.MessageConverter and not a org.springframework.messaging.converter.MessageConverter