Spring-Kafka Consumer KafkaListener cannot convert GenericMessage to Java Object
See your stack trace:
Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(java.lang.String,java.lang.String,java.lang.Integer,int,java.lang.String)]
The method signature is like listenWithHeaders(String, String, Integer, int, String)
But you show us absolutely different one. Please, really be sure that you use the proper code at runtime.
If you have JsonDeserializer
, you really don't need StringJsonMessageConverter
, but the proper method must be used, indeed...
Related videos on Youtube
somnathchakrabarti
Updated on June 04, 2022Comments
-
somnathchakrabarti almost 2 years
I am posting some events of custom Java type 'InventoryEvent' through kafka-rest service running on Confluent-3.3.0 platform on Centos7 instance, using the below two steps:
Command to POST JSON events into kafka-rest
curl -X POST -H "Content-Type:application/vnd.kafka.json.v2+json" --data '{"records" : [{"value" : {"id":1231, "eventType": "inventory.transaction", "qtyLevel" : 2223, "qtyReq" : 2345}}]}' "http://localhost:8082/topics/inventory"
Subscribe the consumer instance to the topic
curl -X POST -H "Content-Type:application/vnd.kafka.v2+json" --data '{"topics" : ["inventory"]}' http://localhost:8082/consumers/inventory_consumers/instances/consumer_1/subscription
Next I am consuming the events sent to the Kafka broker, via a Spring-Kafka application which should consume the JSON and convert it back to Java Type through the Consumer listener method annotated with @KafkaListener, like as below:
public class InventoryEventReceiver { private static final Logger log = LoggerFactory.getLogger(InventoryEventReceiver.class); private CountDownLatch latch = new CountDownLatch(1); public CountDownLatch getLatch() { return latch; } @KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory") public void listenWithHeaders( @Payload InventoryEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) String offset ) { System.out.println("EVENT HAS BEEN RECEIVED "); System.out.println(event.toString()); ObjectMapper objectMapper = new ObjectMapper(); String invEventInString = null; try { invEventInString = objectMapper.writeValueAsString(event); System.out.println(invEventInString); } catch (IOException e) { e.printStackTrace(); } latch.countDown(); } }
But I am getting the below error log in KafkaListenerContainer while trying to consume the messages via the above receiver code
The other listener method definitions that I tried but received the same error are :
Listening with InventoryEvent object
@KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory") public void listenWithHeaders( InventoryEvent event )
Listening with ConsumerRecord (taking cue from the error log)
@KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory") public void listen(ConsumerRecord<?,?> record)
My Receiver Config below takes InventoryEvent as value placeholder. I changed that to String and added StringJsonMessageConverter through
containerFactory.setMessageConverter(new StringJsonMessageConverter());
but it gave the same error.
Am I missing any basic Spring-Kafka configuration like MessageConverter or MessageListener OR is it that I have to implement a custom MessageConverter altogether to deserialize the JSON to Java type InventoryEvent ?
@EnableKafka @Configuration public class InventoryReceiverConfig { @Bean public static ConsumerFactory<String, InventoryEvent> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(InventoryEvent.class)); } @Bean public static ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); containerFactory.setConsumerFactory(consumerFactory()); containerFactory.setConcurrency(3); containerFactory.getContainerProperties().setPollTimeout(3000); return containerFactory; } @Bean public static Map<String, Object> consumerConfigs() { Map<String, Object> consumerProps = new HashMap<>(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"inventory_consumers"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class); return consumerProps; } @Bean public InventoryEventReceiver receiver() { return new InventoryEventReceiver(); } }
THE ERROR LOG:
2017-12-19 13:49:08.671 ERROR 16965 --- [fka-listener-23] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 48, CreateTime = 1513691348668, checksum = 537414172, serialized key size = -1, serialized value size = 77, key = null, value = {id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}) org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(java.lang.String,java.lang.String,java.lang.Integer,int,java.lang.String)] Bean [com.psl.kafka.spring.InventoryEventReceiver@3ecc1b0b]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}], failedMessage=GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:156) ~[spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764) [spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708) [spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230) [spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:981) [spring-kafka-1.1.1.RELEASE.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}], failedMessage=GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}] ... 10 common frames omitted Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.1.RELEASE.jar:na] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:152) ~[spring-kafka-1.1.1.RELEASE.jar:na] ... 9 common frames omitted 2017-12-19 13:49:28.869 INFO 16965 --- [o-8080-exec-113] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started 2017-12-19 13:49:28.889 INFO 16965 --- [o-8080-exec-113] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 20 ms
-
somnathchakrabarti over 6 yearsI have shown the listener method signature as shown in stack trace. In the InventoryEventReceiver class, I have annotated the listenWithHeaders with KafkaListener which is having arguments (String, String, Integer, int, String) as - InventoryEvent event, (KafkaHeaders.RECEIVED_TOPIC) String topic, (KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, (KafkaHeaders.RECEIVED_PARTITION_ID) int partition, (KafkaHeaders.OFFSET) String offset.
-
somnathchakrabarti over 6 yearsIn addition, I have given two listener method signatures that I tried with same error. I am not using StringJsonMessageConverter, as I am doing JsonSerialize/JsonDeserialize for the value part in message
-
somnathchakrabarti over 6 yearsBy the proper method, do you mean a custom MessageConverter for JsonDeserializer? If so, can you give a short example for that?
-
somnathchakrabarti over 6 yearsI checked in the link baeldung.com/spring-kafka but no mention of any converter in the article. It just mentions about handling the custom message
-
Artem Bilan over 6 yearsWell, that’s not true: you show the method with the
InventiryEvent
, but at runtime the call is about a method with theString
instead. I’m not sure how to convince you that there is something wrong with your app, but I won’t mind if you share the simple Spring Boot project to play from our side to reproduce the problem -
somnathchakrabarti over 6 yearsYes I can share the Spring Boot project. Pls provide an id to send
-
Artem Bilan over 6 yearsGitHub is a good public place to share. But bear in mind: that should be as simple as possible and concentrated on the problem only. That would to hard for us to try to understand your business logic. Thanks