How does spring.kafka.consumer.auto-offset-reset works in spring-kafka

16,572

The KafkaProperties in Spring Boot does this:

public Map<String, Object> buildProperties() {
        Map<String, Object> properties = new HashMap<String, Object>();
        if (this.autoCommitInterval != null) {
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
                    this.autoCommitInterval);
        }
        if (this.autoOffsetReset != null) {
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                    this.autoOffsetReset);
        }

This buildProperties() is used from the buildConsumerProperties() which, in turn in the:

@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    return new DefaultKafkaConsumerFactory<Object, Object>(
            this.properties.buildConsumerProperties());
}

So, if you use your own ConsumerFactory bean definition be sure to reuse those KafkaProperties: https://docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/htmlsingle/#boot-features-kafka-extra-props

UPDATE

OK. I see what's going on.

Try to add this property:

spring.kafka.consumer.enable-auto-commit=false

This way we won't have async auto-commits based on some commit interval.

The logic in our application is based on the exit fact after the latch.await(60, TimeUnit.SECONDS);. When we get 3 expected records we exit. This way the async auto-commit from the consumer might not happen yet. So, the next time you run the application the consumer polls data from the uncommited offset.

When we turn off auto-commit, we have an AckMode.BATCH, which is performed synchronously and we have an ability to see really latest recodrs in the topic for this foo consumer group.

Share:
16,572
gstackoverflow
Author by

gstackoverflow

Updated on June 04, 2022

Comments

  • gstackoverflow
    gstackoverflow almost 2 years

    KafkaProperties java doc:

    /**
      * What to do when there is no initial offset in Kafka or if the current offset
      * does not exist any more on the server.
      */
    private String autoOffsetReset;
    

    I have hello world appllication which contains application.properties

    spring.kafka.consumer.group-id=foo
    spring.kafka.consumer.auto-offset-reset=latest
    

    At this case @KafkaListener method is invoked for all entries. But expected result was that @KafkaListener method is invoked only for latest 3 options I send. I tried to use another option:

    spring.kafka.consumer.auto-offset-reset=earlisest
    

    But behaviour the same.

    Can you explain this stuff?

    P.S.

    code sample:

    @SpringBootApplication
    public class Application implements CommandLineRunner {
    
        public static Logger logger = LoggerFactory.getLogger(Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args).close();
        }
    
        @Autowired
        private KafkaTemplate<String, String> template;
    
        private final CountDownLatch latch = new CountDownLatch(3);
    
        @Override
        public void run(String... args) throws Exception {
            this.template.send("spring_kafka_topic", "foo1");
            this.template.send("spring_kafka_topic", "foo2");
            this.template.send("spring_kafka_topic", "foo3");
            latch.await(60, TimeUnit.SECONDS);
            logger.info("All received");
        }
    
        @KafkaListener(topics = "spring_kafka_topic")
        public void listen(ConsumerRecord<?, ?> cr) throws Exception {
            logger.info(cr.toString());
            latch.countDown();
        }
    }
    

    Update:

    Behaviour doesn't depends on
    spring.kafka.consumer.auto-offset-reset

    it is only depends on spring.kafka.consumer.auto-offset-reset=earliest

    spring.kafka.consumer.enable-auto-commit

    if I set spring.kafka.consumer.enable-auto-commit=false - I see all records.

    if I set spring.kafka.consumer.enable-auto-commit=true - I see only 3 last records.

    Please clarify menaning of spring.kafka.consumer.auto-offset-reset property

  • gstackoverflow
    gstackoverflow over 6 years
    Artem, I just use method annotated with @KafkaListener
  • Artem Bilan
    Artem Bilan over 6 years
    Good. Well, maybe you can share with us your simple Spring Boot application to play from our side?
  • gstackoverflow
    gstackoverflow over 6 years
    Sure, added. It is example from doc
  • gstackoverflow
    gstackoverflow over 6 years
    Is it because I start consumer and producer together?
  • Artem Bilan
    Artem Bilan over 6 years
    Please, see an UPDATE in my answer
  • Artem Bilan
    Artem Bilan over 6 years
    Spring Kafka does nothing with the auto-offset-reset. That's absolutely fully up to Apache Kafka to understand that property: kafka.apache.org/documentation/#newconsumerconfigs. I don't understand your behavior either. I told you: there is a race condition in our application when we exit after latch but auto-commit isn't performed yet. Everything rest doesn't make sense for me about earliest
  • gstackoverflow
    gstackoverflow over 6 years
    Ok. at this case my question is the following: How to feel difference of different values of auto-offset-reset? Can you provide exact steps? Description is not clear for me. What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): There are described only one example. Moreover I don't know how to delete data from Kafka