Kafka consumer exception and offset commits

14,985

The container (via ContainerProperties) has a property, ackOnError which is true by default...

/**
 * Set whether or not the container should commit offsets (ack messages) where the
 * listener throws exceptions. This works in conjunction with {@link #ackMode} and is
 * effective only when the kafka property {@code enable.auto.commit} is {@code false};
 * it is not applicable to manual ack modes. When this property is set to {@code true}
 * (the default), all messages handled will have their offset committed. When set to
 * {@code false}, offsets will be committed only for successfully handled messages.
 * Manual acks will be always be applied. Bear in mind that, if the next message is
 * successfully handled, its offset will be committed, effectively committing the
 * offset of the failed message anyway, so this option has limited applicability.
 * Perhaps useful for a component that starts throwing exceptions consistently;
 * allowing it to resume when restarted from the last successfully processed message.
 * @param ackOnError whether the container should acknowledge messages that throw
 * exceptions.
 */
public void setAckOnError(boolean ackOnError) {
    this.ackOnError = ackOnError;
}

Bear in mind, though, that if the next message is successful, its offset will be committed anyway, which effectively commits the failed offset too.

EDIT

Starting with version 2.3, ackOnError is now false by default.

Share:
14,985
yfl
Author by

yfl

Updated on June 19, 2022

Comments

  • yfl
    yfl almost 2 years

    I've been trying to do some POC work for Spring Kafka. Specifically, I wanted to experiment with what are the best practices in terms of dealing with errors while consuming messages within Kafka.

    I am wondering if anyone is able to help with:

    1. Sharing best practices surrounding what Kafka consumers should do when there is a failure
    2. Help me understand how AckMode Record works, and how to prevent commits to the Kafka offset queue when an exception is thrown in the listener method.

    The code example for 2 is given below:

    Given that AckMode is set to RECORD, which according to the documentation:

    commit the offset when the listener returns after processing the record.

    I would have thought the the offset would not be incremented if the listener method threw an exception. However, this was not the case when I tested it using the code/config/command combination below. The offset still gets updated, and the next message continues to be processed.

    My config:

        private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    
       @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
        return factory;
    }
    

    My code:

    @Component
    public class KafkaMessageListener{
        @KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
        public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
                throw new RuntimeException("Oops!");
        }
    

    Command to verify offset:

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
    

    I'm using kafka_2.12-0.10.2.0 and org.springframework.kafka:spring-kafka:1.1.3.RELEASE