Kafka Consumer Error

13,362

Solution 1

I believe that you are missing the fact that JsonDeserializer has to be configured on the ConsumerFactory with an appropriate default type to deserialize, but not in the Kafka properties.

All the info is presented in the Docs: https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#serdes

Solution 2

just adding to above answer,

The below changes solved for me.

config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

adding

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(String.class));

instead of

return new DefaultKafkaConsumerFactory<String, String>(config);

For reference,

the below method in deserialize expecting the headers and "Assert.state.." throws IllegalStateException

 @Override
        public T deserialize(String topic, Headers headers, byte[] data) {
            JavaType javaType = this.typeMapper.toJavaType(headers);
            if (javaType == null) {
                Assert.state(this.targetType != null, "No type information in headers and no default type provided");
                return deserialize(topic, data);
            }
            else {
                try {
                    return this.objectMapper.readerFor(javaType).readValue(data);
                }
                catch (IOException e) {
                    throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
                            "] from topic [" + topic + "]", e);
                }
            }
        }
Share:
13,362
jenjen
Author by

jenjen

Updated on June 04, 2022

Comments

  • jenjen
    jenjen almost 2 years

    I am using a kafka produer and a Spring kafka consumer. I am using a Json serializer and deserializer. Whenever I try to read messages in the consumer from the topic i get the following error:

    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition fan_topic-0 at offset 154. If needed, please seek past the record to continue consumption.
    Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    

    I have not configured anything about headers neither in the producer nor in the consumer. What am i missing here?

  • jenjen
    jenjen almost 6 years
    Adding JsonDeserializer.VALUE_DEFAULT_TYPE in properties fixed the issue.
  • WesternGun
    WesternGun over 4 years
    What if I use EmbeddedKafka to do a simple test where all configurations are put in yaml file? In this case the CosumerFactory bean is constructed all by yaml, not by me; I don't want to the overhead of repeating all info in yaml in config and pass it to the constructor of consumerFactory only to provide the type info.