Kafka Listener method could not be invoked with the incoming message

11,428

Can not deserialize instance of com.springboot.model.Student out of START_ARRAY

If using a json deserailizer, you have a list, not a single Student

@Payload List<Student> student

Or if using the string deserailizer, you have a String of JSON and you must parse it manually

receiveData(@Payload String student ... ) { 
    JsonNode data = new ObjectMapper().readTree(student); // for example, but should extract ObjectMapper to a field
}

Regarding your other output, please see How do I print my Java object without getting "SomeType@2f92e0f4"?

Share:
11,428
Vaibhav Shelar
Author by

Vaibhav Shelar

Updated on June 13, 2022

Comments

  • Vaibhav Shelar
    Vaibhav Shelar almost 2 years

    I am sending an array of JSON by converting it to toString() in Kafka Producer using Spring Boot app, but I am getting following error in Consumer:

    org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.springboot.service.KafkaReciever.recieveData(com.springboot.model.Student,java.lang.String) throws java.lang.Exception] Bean [com.springboot.service.KafkaReciever@5bb3d42d]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.springboot.model.Student] for GenericMessage [payload=[com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8], headers={kafka_offset=45, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=myTopic-kafkasender}], failedMessage=GenericMessage [payload=[com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8], headers={kafka_offset=45, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=myTopic-kafkasender}]

    Configuration File:

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        @Value("${kafka.boot.server}")
        private String kafkaServer;
    
        @Value("${kafka.consumer.group.id}")
        private String kafkaGroupId;
    
        @Bean
        public ConsumerFactory<String, String> consumerConfig() {
    
             Properties props = new Properties();
    
             props.put("bootstrap.servers", "localhost:9092");
             props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
             props.put("message.assembler.buffer.capacity", 33554432);
             props.put("max.tracked.messages.per.partition", 24);
             props.put("exception.on.message.dropped", true);
             props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
             props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
             props.put("segment.deserializer.class", DefaultSegmentDeserializer.class.getName());
    
             return new DefaultKafkaConsumerFactory(props, null, new StringDeserializer());
        }
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> listener = new ConcurrentKafkaListenerContainerFactory<>();
            listener.setConsumerFactory(consumerConfig());
            return listener;
        }
    }
    

    Reciever File:

    @Service
    public class KafkaReciever {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReciever.class);
    
        @KafkaListener(topics = "${kafka.topic.name}", group = "${kafka.consumer.group.id}")
        public void recieveData(@Payload Student student, @Header(KafkaHeaders.MESSAGE_KEY) String messageKey) throws Exception{
            LOGGER.info("Data - " + student + " recieved");
        }
    }
    

    POST json:

     [{
            "studentId": "Q45678123",
            "firstName": "Anderson",
            "lastName": "John",
            "age": "12",
            "address": {
              "apartment": "apt 123",
              "street": "street Info",
              "state": "state",
              "city": "city",
              "postCode": "12345"
            }
        },
        {
            "studentId": "Q45678123",
            "firstName": "abc",
            "lastName": "xyz",
            "age": "12",
            "address": {
              "apartment": "apt 123",
              "street": "street Info",
              "state": "state",
              "city": "city",
              "postCode": "12345"
            }
        }]
    

    I am getting the following consumer output:

    [com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8]
    
    • aran
      aran over 4 years
      Your consumer waits for String (deserializer), but it's receiving a Student. Do you have access to the producer's serializer? You need something like this: stackoverflow.com/questions/40154086/…
    • Vaibhav Shelar
      Vaibhav Shelar over 4 years
      my Model data is: public class Student implements Serializable { private static final long serialVersionUID = 1L; private String studentId; private String firstName; private String lastName; private String age; private Address address; get/set }
    • Vaibhav Shelar
      Vaibhav Shelar over 4 years
      I changed VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer then i got following error: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition myTopic-kafkasender-0 at offset 47 Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of com.springboot.model.Student out of START_ARRAY token at [Source: [B@304eaefb; line: 1, column: 1]
    • harkesh kumar
      harkesh kumar over 4 years
      ConsumerFactory<String, String> please try to user your payload object type like if you class name student then ConsumerFactory<String, Student> ____________________ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); You can user json desrialzer if you need proper ans ping me i will help you
    • Vaibhav Shelar
      Vaibhav Shelar over 4 years
      @harkesh thanks.. adding this getting same error.. but changing in Service as '@Payload List<Student> student ' error solved but data not converted to array of json.
    • harkesh kumar
      harkesh kumar over 4 years
      @VaibhavShelar if you able to fix it then make a sample project and share it i will update your code and fix issue
    • Vaibhav Shelar
      Vaibhav Shelar over 4 years
      @haresh how can i contact with you? I am unable to find any option to chat with you.
  • Vaibhav Shelar
    Vaibhav Shelar over 4 years
    Thanks. I have parsed JSON data in producer and have send it to consumer, after that i am getting proper json data in consumer. just wanted to check, if I am doing this in a correct way or not?
  • OneCricketeer
    OneCricketeer over 4 years
    As long as there are no exceptions, I would say nothing is wrong