Kafka Consumer Error
Solution 1
I believe that you are missing the fact that JsonDeserializer
has to be configured on the ConsumerFactory
with an appropriate default type to deserialize, but not in the Kafka properties.
All the info is presented in the Docs: https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#serdes
Solution 2
just adding to above answer,
The below changes solved for me.
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
adding
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(String.class));
instead of
return new DefaultKafkaConsumerFactory<String, String>(config);
For reference,
the below method in deserialize
expecting the headers and "Assert.state..
" throws IllegalStateException
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
JavaType javaType = this.typeMapper.toJavaType(headers);
if (javaType == null) {
Assert.state(this.targetType != null, "No type information in headers and no default type provided");
return deserialize(topic, data);
}
else {
try {
return this.objectMapper.readerFor(javaType).readValue(data);
}
catch (IOException e) {
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
"] from topic [" + topic + "]", e);
}
}
}
jenjen
Updated on June 04, 2022Comments
-
jenjen almost 2 years
I am using a kafka produer and a Spring kafka consumer. I am using a Json serializer and deserializer. Whenever I try to read messages in the consumer from the topic i get the following error:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition fan_topic-0 at offset 154. If needed, please seek past the record to continue consumption. Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
I have not configured anything about headers neither in the producer nor in the consumer. What am i missing here?
-
jenjen almost 6 yearsAdding JsonDeserializer.VALUE_DEFAULT_TYPE in properties fixed the issue.
-
WesternGun over 4 yearsWhat if I use EmbeddedKafka to do a simple test where all configurations are put in yaml file? In this case the
CosumerFactory
bean is constructed all by yaml, not by me; I don't want to the overhead of repeating all info in yaml inconfig
and pass it to the constructor of consumerFactory only to provide the type info.