Spring-Kafka Consumer KafkaListener cannot convert GenericMessage to Java Object

18,847

See your stack trace:

Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(java.lang.String,java.lang.String,java.lang.Integer,int,java.lang.String)]

The method signature is like listenWithHeaders(String, String, Integer, int, String)

But you show us absolutely different one. Please, really be sure that you use the proper code at runtime.

If you have JsonDeserializer, you really don't need StringJsonMessageConverter, but the proper method must be used, indeed...

Share:
18,847

Related videos on Youtube

somnathchakrabarti
Author by

somnathchakrabarti

Updated on June 04, 2022

Comments

  • somnathchakrabarti
    somnathchakrabarti almost 2 years

    I am posting some events of custom Java type 'InventoryEvent' through kafka-rest service running on Confluent-3.3.0 platform on Centos7 instance, using the below two steps:

    Command to POST JSON events into kafka-rest

    curl -X POST -H "Content-Type:application/vnd.kafka.json.v2+json" --data '{"records" : [{"value" : {"id":1231, "eventType": "inventory.transaction", "qtyLevel" : 2223, "qtyReq" : 2345}}]}' "http://localhost:8082/topics/inventory"
    

    Subscribe the consumer instance to the topic

    curl -X POST -H "Content-Type:application/vnd.kafka.v2+json" --data '{"topics" : ["inventory"]}' http://localhost:8082/consumers/inventory_consumers/instances/consumer_1/subscription
    

    Next I am consuming the events sent to the Kafka broker, via a Spring-Kafka application which should consume the JSON and convert it back to Java Type through the Consumer listener method annotated with @KafkaListener, like as below:

    public class InventoryEventReceiver {
    
        private static final Logger log = LoggerFactory.getLogger(InventoryEventReceiver.class);
    
        private CountDownLatch latch = new CountDownLatch(1);
    
        public CountDownLatch getLatch() {
            return latch;
        }
    
        @KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory")
        public void listenWithHeaders(
                @Payload InventoryEvent event,
                @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                @Header(KafkaHeaders.OFFSET) String offset
                ) {
    
            System.out.println("EVENT HAS BEEN RECEIVED ");
            System.out.println(event.toString());
    
    
            ObjectMapper objectMapper = new ObjectMapper();
            String invEventInString = null;
            try {
                invEventInString = objectMapper.writeValueAsString(event);
                System.out.println(invEventInString);
    
            } catch (IOException e) {
                e.printStackTrace();
            } 
    
            latch.countDown();
        }
    }
    

    But I am getting the below error log in KafkaListenerContainer while trying to consume the messages via the above receiver code

    The other listener method definitions that I tried but received the same error are :

    Listening with InventoryEvent object

    @KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory")
        public void listenWithHeaders(
                InventoryEvent event )
    

    Listening with ConsumerRecord (taking cue from the error log)

    @KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory")
        public void listen(ConsumerRecord<?,?> record)
    

    My Receiver Config below takes InventoryEvent as value placeholder. I changed that to String and added StringJsonMessageConverter through

    containerFactory.setMessageConverter(new StringJsonMessageConverter());
    

    but it gave the same error.

    Am I missing any basic Spring-Kafka configuration like MessageConverter or MessageListener OR is it that I have to implement a custom MessageConverter altogether to deserialize the JSON to Java type InventoryEvent ?

    @EnableKafka
    @Configuration
    public class InventoryReceiverConfig {
    
        @Bean
        public static ConsumerFactory<String, InventoryEvent> consumerFactory() { 
            return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), 
                    new JsonDeserializer<>(InventoryEvent.class));
        }
    
        @Bean
        public static ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
            containerFactory.setConsumerFactory(consumerFactory());
            containerFactory.setConcurrency(3); 
            containerFactory.getContainerProperties().setPollTimeout(3000);
            return containerFactory;
        }
    
        @Bean
        public static Map<String, Object> consumerConfigs() {
            Map<String, Object> consumerProps = new HashMap<>();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"inventory_consumers");
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
            return consumerProps;
        }
    
        @Bean
        public InventoryEventReceiver receiver() {
            return new InventoryEventReceiver();
        }
    
    }
    

    THE ERROR LOG:

    2017-12-19 13:49:08.671 ERROR 16965 --- [fka-listener-23] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 48, CreateTime = 1513691348668, checksum = 537414172, serialized key size = -1, serialized value size = 77, key = null, value = {id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'})
    
    org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
    Endpoint handler details:
    Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(java.lang.String,java.lang.String,java.lang.Integer,int,java.lang.String)]
    Bean [com.psl.kafka.spring.InventoryEventReceiver@3ecc1b0b]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}], failedMessage=GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}]
            at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:156) ~[spring-kafka-1.1.1.RELEASE.jar:na]
            at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.1.RELEASE.jar:na]
            at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.1.RELEASE.jar:na]
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764) [spring-kafka-1.1.1.RELEASE.jar:na]
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708) [spring-kafka-1.1.1.RELEASE.jar:na]
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230) [spring-kafka-1.1.1.RELEASE.jar:na]
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:981) [spring-kafka-1.1.1.RELEASE.jar:na]
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
            at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
            at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
    Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}], failedMessage=GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}]
            ... 10 common frames omitted
    Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}]
            at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
            at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
            at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
            at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
            at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.1.RELEASE.jar:na]
            at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:152) ~[spring-kafka-1.1.1.RELEASE.jar:na]
            ... 9 common frames omitted
    
    2017-12-19 13:49:28.869  INFO 16965 --- [o-8080-exec-113] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
    2017-12-19 13:49:28.889  INFO 16965 --- [o-8080-exec-113] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 20 ms
    
  • somnathchakrabarti
    somnathchakrabarti over 6 years
    I have shown the listener method signature as shown in stack trace. In the InventoryEventReceiver class, I have annotated the listenWithHeaders with KafkaListener which is having arguments (String, String, Integer, int, String) as - InventoryEvent event, (KafkaHeaders.RECEIVED_TOPIC) String topic, (KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, (KafkaHeaders.RECEIVED_PARTITION_ID) int partition, (KafkaHeaders.OFFSET) String offset.
  • somnathchakrabarti
    somnathchakrabarti over 6 years
    In addition, I have given two listener method signatures that I tried with same error. I am not using StringJsonMessageConverter, as I am doing JsonSerialize/JsonDeserialize for the value part in message
  • somnathchakrabarti
    somnathchakrabarti over 6 years
    By the proper method, do you mean a custom MessageConverter for JsonDeserializer? If so, can you give a short example for that?
  • somnathchakrabarti
    somnathchakrabarti over 6 years
    I checked in the link baeldung.com/spring-kafka but no mention of any converter in the article. It just mentions about handling the custom message
  • Artem Bilan
    Artem Bilan over 6 years
    Well, that’s not true: you show the method with the InventiryEvent, but at runtime the call is about a method with the String instead. I’m not sure how to convince you that there is something wrong with your app, but I won’t mind if you share the simple Spring Boot project to play from our side to reproduce the problem
  • somnathchakrabarti
    somnathchakrabarti over 6 years
    Yes I can share the Spring Boot project. Pls provide an id to send
  • Artem Bilan
    Artem Bilan over 6 years
    GitHub is a good public place to share. But bear in mind: that should be as simple as possible and concentrated on the problem only. That would to hard for us to try to understand your business logic. Thanks