How to skip corrupt (non-serializable) messages in Spring Kafka Consumer?
Solution 1
You need ErrorHandlingDeserializer
: https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer
If you can't move to that 2.2
version, consider to implement your own and return null
for those records which can't be deserialized properly.
The source code is here: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java
Solution 2
In case you are using older version of kafka, in a @KafkaListener set the following Consumer factory configurations.
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
Here is the code for CustomDeserializer:
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomDeserializer implements Deserializer<Object>
{
@Override
public void configure( Map<String, ?> configs, boolean isKey )
{
}
@Override
public Object deserialize( String topic, byte[] data )
{
ObjectMapper mapper = new ObjectMapper();
Object object = null;
try
{
object = mapper.readValue(data, Object.class);
}
catch ( Exception exception )
{
System.out.println("Error in deserializing bytes " + exception);
}
return object;
}
@Override
public void close()
{
}
}
Since I want my code to be generic enough to read any kind of json, object = mapper.readValue(data, Object.class); I am converting it to Object.class. And as we are catching exception here, it won't be retried once read.
Related videos on Youtube
Shankar P S
Java developer. Now working with Spring frameworks. Worked with Struts, JSF and EE servers.
Updated on September 16, 2022Comments
-
Shankar P S over 1 year
This question is for Spring Kafka, related to Apache Kafka with High Level Consumer: Skip corrupted messages
Is there a way to configure Spring Kafka consumer to skip a record that cannot be read/processed (is corrupt)?
I am seeing a situation where the consumer gets stuck on the same record if it cannot be deserialized. This is the error the consumer throws.
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value
The consumer polls the topic and just keeps printing the same error in a loop till program is killed.
In a @KafkaListener that has the following Consumer factory configurations,
Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
-
Gary Russell over 5 yearsYou can also use the
kafka-consumer-groups.sh
to advance past the bad the record. See the--shift-by
option. But theErrorHandlingDeserializer
is intended for handling this in a production application. -
Raghava Dhanya over 4 years
ErrorHandlingDeserializer
has been deprecated you can now useErrorHandlingDeserializer2
the same way -
ttt over 4 yearsHow could this be used with Avro which is
KafkaAvroDeserializer
? Thanks. -
Artem Bilan over 4 yearsPlease, raise a new SO question