Dead letter queue (DLQ) for Kafka with spring-kafka

25,048

See the SeekToCurrentErrorHandler.

When an exception occurs, it seeks the consumer so that all unprocessed records are redelivered on the next poll.

You can use the same technique (e.g. a subclass) to write to the DLQ and seek the current offset (and other unprocessed) if the DLQ write fails, and seek just the remaining records if the DLQ write succeeds.

EDIT

The DeadLetterPublishingRecoverer was added a few months after this answer was posted.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

Share:
25,048
Eugene Khyst
Author by

Eugene Khyst

I am an Engineering Manager and Software Architect. My path: Software Engineer -> Tech Lead -> Solutions Architect -> VP of engineering running a department of 60+ people. I design robust software solutions, build engineering teams and establish an engineering culture. I have experience in improving an employer brand to boost the hiring of IT specialists and scaling teams to grow effective organizations. Favorite tech stack: Java, Spring Boot, TypeScript, Node.js, NestJS, Vue 3, Kafka, RabbitMQ, Docker, Kubernetes, Nginx, Prometheus, Grafana, Mocha, WebdriverIO, Appium, k6. Favorite methodologies: Agile, Kanban, Lean.

Updated on November 23, 2021

Comments

  • Eugene Khyst
    Eugene Khyst over 2 years

    What is the best way to implement Dead letter queue (DLQ) concept in Spring Boot 2.0 application using spring-kafka 2.1.x to have all messages that were failed to be processed by @KafkaListener method of some bean sent to some predefined Kafka DLQ topic and not lose the single message?

    So consumed Kafka record is either:

    1. successfully processed,
    2. failed to be processed and is sent to the DLQ topic,
    3. failed to be processed, is not sent to the DLQ topic (due to the unexpected problem) so will be consumed by the listener again.

    I tried to create listener container with the custom implementation of the ErrorHandler sending records failed to be processed to DLQ topic using KafkaTemplate. Using disabled auto-commit and RECORD AckMode.

    spring.kafka.enable-auto-ack=false
    spring.kafka.listener.ack-mode=RECORD
    
    @Configuration
    public class KafkaConfig {
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
            ...
            factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
            return factory;
        }
    }
    
    @Component
    public class DlqErrorHandler implements ErrorHandler {
    
        @Autowired
        private KafkaTemplate<Object, Object> kafkaTemplate;
    
        @Value("${dlqTopic}")
        private String dlqTopic;
    
        @Override
        public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
            log.error("Error, sending to DLQ...");
            kafkaTemplate.send(dlqTopic, record.key(), record.value());
        }
    }
    

    It seems that this implementation doesn't guarantee item #3. If an exception will be thrown in DlqErrorHandler record will not be consumed by the listener once again.

    Will usage of the transactional listener container help?

    factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
    

    Is there any convenient way to implement DLQ concept using Spring Kafka?

    UPDATE 2018/03/28

    Thanks to Gary Russell's answer I was able to achieve the desired behavior by implementing DlqErrorHandler as follows

    @Configuration
    public class KafkaConfig {
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
            ...
            factory.getContainerProperties().setAckOnError(false);
            factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
            return factory;
        }
    }
    
    @Component
    public class DlqErrorHandler implements ContainerAwareErrorHandler {
        ...
        @Override
        public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            Consumerrecord<?, ? record = records.get(0);
            try {
                kafkaTemplate.send("dlqTopic", record.key, record.value());
                consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
                // Other records may be from other partitions, so seek to current offset for other partitions too
                // ...
            } catch (Exception e) {
                consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
                // Other records may be from other partitions, so seek to current offset for other partitions too
                // ...
                throw new KafkaException("Seek to current after exception", thrownException);
            }
        }
    }
    

    This way if consumer poll returns 3 records (1, 2, 3) and the 2nd one can't be processed:

    • 1 will be processed
    • 2 will fail to be processed and sent to the DLQ
    • 3 thanks to consumer seek to record.offset() + 1, it will be delivered to the listener

    If sending to DLQ fails consumer seeks to the record.offset() and the record will be re-delivered to the listener (and sending to DLQ probably will be retired).

    UPDATE 2021/04/30

    Since Spring Kafka 2.7.0 non-blocking retries and dead letter topics are natively supported.

    See the example: https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt

    Retries should usually be non-blocking (done in separate topics) and delayed:

    • to not disrupt real-time traffic;
    • to not amplify the number of calls, essentially spamming bad requests;
    • for observability (to obtain number on the retries and other metadata). Achieving non-blocking retry and DLT functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners. Kafka non-blocking retries and DLT