Kafka suddenly reset the consumer Offset

12,817

What's happening is you are depiling your topic too slowly for a while.

Kafka has a retention model which is not based on whether consumer got the data, but on disk usage and/or period. At some point, you get too late, and the next message you need has already been wiped out, and isn't available anymore, due to kafka having cleaned the data. Hence the Current offset 45268422051 for partition [MyTopic,0] out of range; reset offset to 1948447612 messages.

Your consumer then applies your reset policy to bootstrap itself again, in your case smallest.

It's a common issue when you have bursty workflows, and sometimes fall out of the data retention range. It probably disappeared because you improved your depiling speed, or increased your retention policy to be able to survive bursts.

Share:
12,817
Gnarik
Author by

Gnarik

Working on BigData challenges

Updated on June 11, 2022

Comments

  • Gnarik
    Gnarik almost 2 years

    I'm working with Kafka 0.8 & zookeeper 3.3.5. Actually, we have a dozen of topic we are consuming without any issue.

    Recently, we started to feed and consume a new topic that has a weird behavior. The consumed offset was suddenly reset. It respects the auto.offset.reset policy we set (actually smallest) but I cannot understand why the topic suddenly reset its offset.

    I'm using the high-level consumer.

    Here are some ERROR log I found: We have a bunch of this error log:

    [2015-03-26 05:21:17,789] INFO Fetching metadata from broker id:1,host:172.16.23.1,port:9092 with correlation id 47 for 1 topic(s) Set(MyTopic) (kafka.cl
    ient.ClientUtils$)
    [2015-03-26 05:21:17,789] ERROR Producer connection to 172.16.23.1:9092 unsuccessful (kafka.producer.SyncProducer)
    java.nio.channels.ClosedByInterruptException
            at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
            at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681)
            at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
            at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
            at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
            at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
            at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
            at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
            at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    

    Each time this issue happened I see that WARN log:

    [2015-03-26 05:21:30,596] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
    

    And then the real problem happens:

    [2015-03-26 05:21:47,551] INFO Connected to 172.16.23.5:9092 for producing (kafka.producer.SyncProducer)
    [2015-03-26 05:21:47,552] INFO Disconnecting from 172.16.23.5:9092 (kafka.producer.SyncProducer)
    [2015-03-26 05:21:47,553] INFO [ConsumerFetcherManager-1427047649942] Added fetcher for partitions ArrayBuffer([[MyTopic,0], initOffset 45268422051 to br
    oker id:5,host:172.16.23.5,port:9092] ) (kafka.consumer.ConsumerFetcherManager)
    [2015-03-26 05:21:47,553] INFO [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Starting  (kafka.consumer.Cons
    umerFetcherThread)
    [2015-03-26 05:21:50,388] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 45268422051 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
    [2015-03-26 05:21:50,490] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
    [2015-03-26 05:21:50,591] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
    [2015-03-26 05:21:50,692] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
    

    Now the questions: Is there someone who has already experienced this behavior ? Is there someone who can tell me when Kafka decides to reset its offset whether the auto.offset.reset is largest or smallest ?

    Thank you.

  • Admin
    Admin almost 8 years
    Can you recommend the appropriate fix for this? Secor has stopped backing up anything from Kafka due a recent offset reset and is throwing many of these errors.
  • C4stor
    C4stor almost 8 years
    Depends. If on average you have enough depiling speed, increase retention to survive bursts. Otherwise, scala Secor horizontally to handle the traffic. Not that if secor is not backing up anything at all now, it's certainly not the same problem.
  • jsh
    jsh about 7 years
    We ran into this issue as well. Is it possible that this happens when broker rotates the corresponding topic-partition log file, and client reconnects with an offset, which no longer match with the starting offset of the newly rotated topic-partition log file?
  • C4stor
    C4stor about 7 years
    It isn't really file dependent. But since file rotation is actually what bumps the minimum offset available on the broker, yeah, it's possible.
  • Raman
    Raman about 4 years
    This behavior shouldn't happen any more with Kafka 2.1.0+ as long as the consumer group remains active (issues.apache.org/jira/browse/KAFKA-4682), however there might still be bugs with it (issues.apache.org/jira/browse/KAFKA-10007).