Spring Kafka Auto Commit Offset In Case of Failures

10,039

Solution 1

I prefer to set it to false; it is more reliable for the container to manage the offsets for you.

Set the container's AckMode to RECORD (it defaults to BATCH) and the container will commit the offset for you after the listener returns.

Also consider upgrading to at least 1.3.3 (current version is 2.1.4); 1.3.x introduced a much simpler threading model, thanks to KIP-62

EDIT

With auto-commit, the offset will be committed regardless of success/failure. The container won't commit after a failure, unless ackOnError is true (another reason to not use auto commit).

However, that still won't help because the broker won't send the same record again. You have to perform a seek operation on the Consumer for that.

In 2.0.1 (current version is 2.1.4), we added the SeekToCurrentErrorHandler which will cause the failed and unprocessed records to be re-sent on the next poll. See the reference manual.

You can also use a ConsumerAwareListener to perform the seek yourself (also added in 2.0).

With older versions (>= 1.1) you have to use a ConsumerSeekAware listener which is quite a bit more complicated.

Another alternative is to add retry so the delivery will be re-attempted according to the retry settings.

Solution 2

Apparently, there will be message loss with Spring Kafka <=1.3.3 @KafkaListener even if you use ackOnError=false if we expect Spring Kafka to automatically (at least document) take care of this by retrying and "simply not doing poll again"? :). And, the default behavior is to just log.

We were able to reproduce message loss/skip on a consumer even with spring-kafka 1.3.3.RELEASE (no maven sources) and with a single partition topic, concurrency(1), AckOnError(false), BatchListener(true) with AckMode(BATCH) for any runtime exceptions. We ended up doing retries inside the template or explore ConsumerSeekAware.

@GaryRussell, regarding "broker won't send same record again" or continue returning next batch of messages without commit?, is this because, consumer poll is based on current offset that it seeked to get next batch of records and not exactly on last offsets committed? Basically, consumers need not commit at all assuming some run time exceptions on every processing and keep consuming entire messages on topic. Only a restart will start from last committed offset (duplicate).

Upgrade to 2.0+ to use ConsumerAwareListenerErrorHandler seems requires upgrading to at least Spring 5.x which is a major upgrade.

Share:
10,039

Related videos on Youtube

rishi
Author by

rishi

Updated on July 01, 2022

Comments

  • rishi
    rishi almost 2 years

    I am using Spring Kafka 1.2.2.RELEASE. I have a Kafka Listener as consumer that listens to a topic and index the document in elastic. My Auto commit offset property is set to true //default.

    I was under the impression that in case there is an exception in the listener(elastic is down) the offsets should not be committed and the same message should be processed for the next poll

    However this is not happening and the consumer commits the offset on the next poll.After reading posts and documentation i learnt that this is the case that with auto commit set to true to next poll will commit all offset

    My doubt is why is the consumer calling the next poll and also how can i prevent any offset from committing with auto commit to true or do i need to set this property to false and commit manually.

  • rishi
    rishi about 6 years
    Thanks Gary. But is this the expected behaviour, as per my understanding the record should not be committed and would be replayed even with auto commit as true.
  • Gary Russell
    Gary Russell about 6 years
    See the edit to my answer; the commit is irrelevant.
  • rishi
    rishi about 6 years
    Regarding upgrading for spring kafka --- we are using kafka client 0.10.2 on production and the latest spring kafka version that supports kafka client 0.10.2 is 1.2.x
  • rishi
    rishi about 6 years
    I am doing manual commit now.But if even with ackOnError as false the broker won't send the same records again, then what is the point of having ackOnError as false? Am i missing a specific user case for which ackOnError will come handy.
  • rishi
    rishi about 6 years
    We can not upgrade Spring kafka(1.2.2) as it also requires Kafka client upgrade(0.10.2 currently) to 0.11 minimum and yes i have this same doubt that if the consumer is going to poll regardless of the previous records been successfully processed or not, then is the commit is only relevant in case of restart ?
  • Gary Russell
    Gary Russell about 6 years
    ackOnError=false is usually used when the container is stopped after a failure. That way, when the container is restarted, the failed message is redelivered. I believe that starting with 0.10.2.0, newer clients can talk to older brokers, as long as you don't try to use new features. We now have the SeekToCurrentErrorHandler which avoids having to stop the container. I would recommend upgrading your brokers, though.
  • kisna
    kisna about 6 years
    Spring kafka 1.3.3 is compatible with 0.10. Newer Spring Kafka versions gave clients full access to consumer to update offsets etc., but, in general, it did not make sense for Spring library to keep retrying a run time exception given client code can keep throwing it again. We added retries for something like a DB/IO exception, rest were just logged to a file/DB stream using error handler to continue processing the rest. Yes, commits can then be controlled using Ack in a try/finally with this approach. Or use auto commit approach on batch/record to let spring commit automatically.