How to skip corrupt (non-serializable) messages in Spring Kafka Consumer?


Solution 1

You need ErrorHandlingDeserializer:

If you can't move to that 2.2 version, consider to implement your own and return null for those records which can't be deserialized properly.

The source code is here:

Solution 2

In case you are using older version of kafka, in a @KafkaListener set the following Consumer factory configurations.

 Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);

Here is the code for CustomDeserializer:

 import java.util.Map;
    import org.apache.kafka.common.serialization.Deserializer;
    import com.fasterxml.jackson.databind.ObjectMapper;
    public class CustomDeserializer implements Deserializer<Object>
        public void configure( Map<String, ?> configs, boolean isKey )

        public Object deserialize( String topic, byte[] data )
            ObjectMapper mapper = new ObjectMapper();
            Object object = null;
                object = mapper.readValue(data, Object.class);
            catch ( Exception exception )
                System.out.println("Error in deserializing bytes " + exception);
            return object;

        public void close()

Since I want my code to be generic enough to read any kind of json, object = mapper.readValue(data, Object.class); I am converting it to Object.class. And as we are catching exception here, it won't be retried once read.


Related videos on Youtube

Shankar P S
Author by

Shankar P S

Java developer. Now working with Spring frameworks. Worked with Struts, JSF and EE servers.

Updated on September 16, 2022


  • Shankar P S
    Shankar P S over 1 year

    This question is for Spring Kafka, related to Apache Kafka with High Level Consumer: Skip corrupted messages

    Is there a way to configure Spring Kafka consumer to skip a record that cannot be read/processed (is corrupt)?

    I am seeing a situation where the consumer gets stuck on the same record if it cannot be deserialized. This is the error the consumer throws.

    Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value 

    The consumer polls the topic and just keeps printing the same error in a loop till program is killed.

    In a @KafkaListener that has the following Consumer factory configurations,

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  • Gary Russell
    Gary Russell over 5 years
    You can also use the to advance past the bad the record. See the --shift-by option. But the ErrorHandlingDeserializer is intended for handling this in a production application.
  • Raghava Dhanya
    Raghava Dhanya over 4 years
    ErrorHandlingDeserializer has been deprecated you can now use ErrorHandlingDeserializer2 the same way
  • ttt
    ttt over 4 years
    How could this be used with Avro which is KafkaAvroDeserializer? Thanks.
  • Artem Bilan
    Artem Bilan over 4 years
    Please, raise a new SO question