read kafka message starting from a specific offset using high level API

24,578

Solution 1

You can do that with kafka 0.9:

http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

public void seek(TopicPartition partition, long offset)

Overrides the fetch offsets that the consumer will use on the next poll(timeout). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets

Solution 2

Kafka 0.8.1.1 can use Zookeeper to store offsets for each consumer group. If you configure your consumer to commit offsets to zookeeper than you Need just to manually set the starting offset for the topic and partition under zookeeper for your consumer Group. You Need to connect to zookeeper and use the set command:

set /consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

E.g. setting offset 10 for partition 0 of topicname for the spark-app consumer Group. set /consumers/spark-app/offsets/topicname/0 10

When a consumer starts to consume message from Kafka it always starts to consume from the last committed offset. If this last committes offset is not.valid for any reason than the consumer applies the logic due the configurazione properties auto.offset.reset.

Hope this helps.

Share:
24,578
user1002065
Author by

user1002065

Updated on April 27, 2020

Comments

  • user1002065
    user1002065 about 4 years

    I hope I am not making a mistake, but I remember that in Kafka documentation it mentioned that using high level APIs you can't start reading messages from a specific offset, but it was mentioned that it would change.

    Is it possible now using the high level APIs to read messages from a specific partition and a specific offset? Could you please give me an example how to do it?

    I am using kafka 0.8.1.1.

    Thanks in advance.