How does spring.kafka.consumer.auto-offset-reset works in spring-kafka
The KafkaProperties
in Spring Boot does this:
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.autoCommitInterval != null) {
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
this.autoCommitInterval);
}
if (this.autoOffsetReset != null) {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.autoOffsetReset);
}
This buildProperties()
is used from the buildConsumerProperties()
which, in turn in the:
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<Object, Object>(
this.properties.buildConsumerProperties());
}
So, if you use your own ConsumerFactory
bean definition be sure to reuse those KafkaProperties
: https://docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/htmlsingle/#boot-features-kafka-extra-props
UPDATE
OK. I see what's going on.
Try to add this property:
spring.kafka.consumer.enable-auto-commit=false
This way we won't have async auto-commits based on some commit interval.
The logic in our application is based on the exit fact after the latch.await(60, TimeUnit.SECONDS);
. When we get 3
expected records we exit. This way the async auto-commit from the consumer might not happen yet. So, the next time you run the application the consumer polls data from the uncommited offset.
When we turn off auto-commit, we have an AckMode.BATCH
, which is performed synchronously and we have an ability to see really latest recodrs in the topic for this foo
consumer group.
gstackoverflow
Updated on June 04, 2022Comments
-
gstackoverflow almost 2 years
KafkaProperties
java doc:/** * What to do when there is no initial offset in Kafka or if the current offset * does not exist any more on the server. */ private String autoOffsetReset;
I have hello world appllication which contains
application.properties
spring.kafka.consumer.group-id=foo spring.kafka.consumer.auto-offset-reset=latest
At this case
@KafkaListener
method is invoked for all entries. But expected result was that@KafkaListener
method is invoked only for latest 3 options I send. I tried to use another option:spring.kafka.consumer.auto-offset-reset=earlisest
But behaviour the same.
Can you explain this stuff?
P.S.
code sample:
@SpringBootApplication public class Application implements CommandLineRunner { public static Logger logger = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { SpringApplication.run(Application.class, args).close(); } @Autowired private KafkaTemplate<String, String> template; private final CountDownLatch latch = new CountDownLatch(3); @Override public void run(String... args) throws Exception { this.template.send("spring_kafka_topic", "foo1"); this.template.send("spring_kafka_topic", "foo2"); this.template.send("spring_kafka_topic", "foo3"); latch.await(60, TimeUnit.SECONDS); logger.info("All received"); } @KafkaListener(topics = "spring_kafka_topic") public void listen(ConsumerRecord<?, ?> cr) throws Exception { logger.info(cr.toString()); latch.countDown(); } }
Update:
Behaviour doesn't depends on
spring.kafka.consumer.auto-offset-reset
it is only depends on spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit
if I set
spring.kafka.consumer.enable-auto-commit=false
- I see all records.if I set
spring.kafka.consumer.enable-auto-commit=true
- I see only 3 last records.Please clarify menaning of
spring.kafka.consumer.auto-offset-reset
property -
gstackoverflow over 6 yearsArtem, I just use method annotated with @KafkaListener
-
Artem Bilan over 6 yearsGood. Well, maybe you can share with us your simple Spring Boot application to play from our side?
-
gstackoverflow over 6 yearsSure, added. It is example from doc
-
gstackoverflow over 6 yearsIs it because I start consumer and producer together?
-
Artem Bilan over 6 yearsPlease, see an UPDATE in my answer
-
Artem Bilan over 6 yearsSpring Kafka does nothing with the
auto-offset-reset
. That's absolutely fully up to Apache Kafka to understand that property: kafka.apache.org/documentation/#newconsumerconfigs. I don't understand your behavior either. I told you: there is a race condition in our application when we exit afterlatch
but auto-commit isn't performed yet. Everything rest doesn't make sense for me aboutearliest
-
gstackoverflow over 6 yearsOk. at this case my question is the following: How to feel difference of different values of auto-offset-reset? Can you provide exact steps? Description is not clear for me. What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): There are described only one example. Moreover I don't know how to delete data from Kafka