How to catch deserialization error in Kafka-Spring?

22,419

Solution 1

ErrorHandlingDeserializer

When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.

So according to your code you are using record-level MessageListener then just add ErrorHandler to Container

Handling Exceptions

If your error handler implements this interface you can, for example, adjust the offsets accordingly. For example, to reset the offset to replay the failed message, you could do something like the following; note however, these are simplistic implementations and you would probably want more checking in the error handler.

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
    this.listen3Exception = e;
    MessageHeaders headers = m.getHeaders();
    c.seek(new org.apache.kafka.common.TopicPartition(
            headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
            headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
            headers.get(KafkaHeaders.OFFSET, Long.class));
    return null;
   };
}

Or you can do custom implementation like in this example

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory()  {

    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");
        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
            String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");


        }
    });


    return factory;
}

Solution 2

Use ErrorHandlingDeserializer.

When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.

You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer configured with the proper delegates. Alternatively, you can use consumer configuration properties which are used by the ErrorHandlingDeserializer to instantiate the delegates. The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS; the property value can be a class or class name

package com.mypackage.app.config;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.mypacakage.app.model.kafka.message.KafkaEvent;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import lombok.extern.slf4j.Slf4j;

@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String servers;

    @Value("${listener.group-id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() {
    
        ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.setRetryTemplate(retryTemplate());
        factory.setErrorHandler(((exception, data) -> {
            /*
             * here you can do you custom handling, I am just logging it same as default
             * Error handler does If you just want to log. you need not configure the error
             * handler here. The default handler does it for you. Generally, you will
             * persist the failed records to DB for tracking the failed records.
             */
            log.error("Error in process with Exception {} and the record is {}", exception, data);
        }));

        return factory;

    }

    @Bean
    public ConsumerFactory<String, KafkaEvent> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
                "com.mypackage.app.model.kafka.message.KafkaEvent");
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app");

        return new DefaultKafkaConsumerFactory<>(config);
    }

    private RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        /*
         * here retry policy is used to set the number of attempts to retry and what
         * exceptions you wanted to try and what you don't want to retry.
         */
        retryTemplate.setRetryPolicy(retryPolicy());

        return retryTemplate;
    }

    private SimpleRetryPolicy retryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();

        // the boolean value in the map determines whether exception should be retried
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);
        exceptionMap.put(ListenerExecutionFailedException.class, true);

        return new SimpleRetryPolicy(3, exceptionMap, true);
    }
}

Solution 3

Above answer may have problem if the partion name have character like '-'. so, i have modified same logic with regex.

    import java.util.List;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.SerializationException;
    import org.springframework.kafka.listener.ErrorHandler;
    import org.springframework.kafka.listener.MessageListenerContainer;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class KafkaErrHandler implements ErrorHandler {
    
        /**
         * Method prevents serialization error freeze
         * 
         * @param e
         * @param consumer
         */
        private void seekSerializeException(Exception e, Consumer<?, ?> consumer) {
            String p = ".*partition (.*) at offset ([0-9]*).*";
            Pattern r = Pattern.compile(p);
    
            Matcher m = r.matcher(e.getMessage());
    
            if (m.find()) {
                int idx = m.group(1).lastIndexOf("-");
                String topics = m.group(1).substring(0, idx);
                int partition = Integer.parseInt(m.group(1).substring(idx));
                int offset = Integer.parseInt(m.group(2));
    
                TopicPartition topicPartition = new TopicPartition(topics, partition);
    
                consumer.seek(topicPartition, (offset + 1));
    
                log.info("Skipped message with offset {} from partition {}", offset, partition);
            }
        }
    
        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {
            log.error("Error in process with Exception {} and the record is {}", e, record);
    
            if (e instanceof SerializationException)
                seekSerializeException(e, consumer);
        }
    
        @Override
        public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                MessageListenerContainer container) {
            log.error("Error in process with Exception {} and the records are {}", e, records);
    
            if (e instanceof SerializationException)
                seekSerializeException(e, consumer);
    
        }
    
        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> record) {
            log.error("Error in process with Exception {} and the record is {}", e, record);
        }
    
    } 

finally use the error handler in config.

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericType> macdStatusListenerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, GenericType> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(macdStatusConsumerFactory());
    factory.setRetryTemplate(retryTemplate());
    factory.setErrorHandler(new KafkaErrHandler());

    return factory;
}

However parsing error string to get parition, topic and offset is not recommended. If anyone have better solution please post here.

Share:
22,419
Pietro Fragnito
Author by

Pietro Fragnito

Updated on July 09, 2022

Comments

  • Pietro Fragnito
    Pietro Fragnito almost 2 years

    I'm getting up an application consuming kafka messages.

    I followed Spring-docs about Deserialization Error Handling in order to catch deserialization exception. I've tried the failedDeserializationFunction method.

    This is my Consumer Configuration Class

    @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> consumerProps = new HashMap<>();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
            consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
            
            /*  Error Handling */
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
            consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
            consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);
    
            return consumerProps;
        }
    
        @Bean
        public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                    new JsonDeserializer<>(NTCMessageBody.class));
        }
        
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
    
            return factory;
        }
    

    This is the BiFunction Provider

    public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {
    
        @Override
        public NTCMessageBody apply(byte[] t, Headers u) {
            return new NTCBadMessageBody(t);
        }
    
    }
    
    public class NTCBadMessageBody extends NTCMessageBody{
    
        private final byte[] failedDecode;
    
        public NTCBadMessageBody(byte[] failedDecode) {
            this.failedDecode = failedDecode;
        }
    
        public byte[] getFailedDecode() {
            return this.failedDecode;
        }
    
    }
    
    

    When I send just one corrupted message on the topic I got this error (in loop):

    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value

    I understood that the ErrorHandlingDeserializer2 should delegate the NTCBadMessageBody type and continue the consumption. I also saw (in debug mode) it didn't never go in the constructor of the NTCBadMessageBody class.

  • Pietro Fragnito
    Pietro Fragnito almost 5 years
    I'm trying ... but seems there is a problem in your second example cause I got The method setErrorHandler(new ErrorHandler(){}) is undefined for the type ContainerProperties
  • Pietro Fragnito
    Pietro Fragnito almost 5 years
    Making this correction factory.setErrorHandler(new ErrorHandler() it works great! Thanks a lot!
  • Guillaume
    Guillaume almost 3 years
    Your solution is better than the approved one. I changed int partition = Integer.parseInt(m.group(1).substring(idx)); to int partition = Integer.parseInt(m.group(1).substring(idx+1)); to avoid negative number
  • Marinos An
    Marinos An over 2 years
    Is there a way to access partition information (actually TopicPartition) in the custom implementation above for any given exception? We want to catch exceptions and log them to the database and then increase the offset on the partition. But by parsing the exception message as above we can do it only for SerializationException.