How to get data from old offset point in Kafka?

48,254

Solution 1

The consumers belong always to a group and, for each partition, the Zookeeper keeps track of the progress of that consumer group in the partition.

To fetch from the beginning, you can delete all the data associated with progress as Hussain refered

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

You can also specify the offset of partition you want, as specified in core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)

However the offset is not time indexed, but you know for each partition is a sequence.

If your message contains a timestamp (and beware that this timestamp has nothing to do with the moment Kafka received your message), you can try to do an indexer that attempts to retrieve one entry in steps by incrementing the offset by N, and store the tuple (topic X, part 2, offset 100, timestamp) somewhere.

When you want to retrieve entries from a specified moment in time, you can apply a binary search to your rough index until you find the entry you want and fetch from there.

Solution 2

From the Kafka documentation they say "kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages. Don’t assume that offset 0 is the beginning offset, since messages age out of the log over time. "

Use the SimpleConsumerExample here: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

Similar question: Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)

This might help

Solution 3

Refer the doc about kafka config : http://kafka.apache.org/08/configuration.html for your query on smallest and largest values of offset parameter.

BTW, While exploring kafka, I was wondering how to replay all messages for a consumer. I mean if a consumer group has polled all the messages and it wants to re-get those.

The way it can be achieved is to delete data from zookeeper. Use the kafka.utils.ZkUtils class to delete a node on zookeeper. Below is its usage :

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

Solution 4

For Now

Kafka FAQ give an answer to this problem.

How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?

Kafka allows querying offsets of messages by time and it does so at segment granularity. The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.

For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling.


Future Plan

Kafka will add timestamp to message format. Refer to

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

Solution 5

Kafka Protocol Doc is a great source to play with request/response/Offsets/Messages: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol you use Simple Consumer example as where following code demonstrate the state:

FetchRequest req = new FetchRequestBuilder()

        .clientId(clientName)

        .addFetch(a_topic, a_partition, readOffset, 100000) 

        .build();

FetchResponse fetchResponse = simpleConsumer.fetch(req);

set readOffset to start initial offset from. but you need to check the max offset as well as above will provide limited offsets count as per FetchSize in last param of addFetch method.

Share:
48,254
Sourabh
Author by

Sourabh

Developer

Updated on January 20, 2020

Comments

  • Sourabh
    Sourabh over 4 years

    I am using zookeeper to get data from kafka. And here I always get data from last offset point. Is there any way to specify the time of offset to get old data?

    There is one option autooffset.reset. It accepts smallest or largest. Can someone please explain what is smallest and largest. Can autooffset.reset helps in getting data from old offset point instead of latest offset point?

  • Hild
    Hild almost 11 years
    They also have an code sample for reference. worth taking a look
  • pherris
    pherris over 10 years
    The example Hild is referring to is: cwiki.apache.org/confluence/display/KAFKA/… You cannot use the 'Consumer' example, you have to use the 'SimpleConsumerDemo' example to play with offsets.
  • usman
    usman over 8 years
    Do check new Api provided in 0.9.0.0 version of Kafka they have gone one step up by combining combining Simple & high level consumers.