How to get latest offset for a partition for a kafka topic?
Solution 1
Finally after spending a day on this and several false starts, I was able to find a solution and get it working. Posting it her so that others may refer to it.
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload
client = SimpleClient(brokers)
partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
offsets_responses = client.send_offset_request(offset_requests)
for r in offsets_responses:
print "partition = %s, offset = %s"%(r.partition, r.offsets[0])
Solution 2
If you wish to use Kafka shell scripts present in kafka/bin, then you can get latest and smallest offsets by using kafka-run-class.sh.
To get latest offset command will look like this
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname
To get smallest offset command will look like this
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname
You can find more information on Get Offsets Shell from following link
Hope this helps!
Solution 3
from kafka import KafkaConsumer, TopicPartition
TOPIC = 'MYTOPIC'
GROUP = 'MYGROUP'
BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092']
consumer = KafkaConsumer(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=GROUP,
enable_auto_commit=False
)
for p in consumer.partitions_for_topic(TOPIC):
tp = TopicPartition(TOPIC, p)
consumer.assign([tp])
committed = consumer.committed(tp)
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed)))
consumer.close(autocommit=False)
Solution 4
With kafka-python>=1.3.4
you can use:
kafka.KafkaConsumer.end_offsets(partitions)
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.end_offsets(ps)
Solution 5
Another way to achieve this is by polling the consumer to obtain the last consumed offset and then using the seek_to_end method to obtain the most recent available offset partition.
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
consumer.poll()
consumer.seek_to_end()
This method particularly comes in handy when using consumer groups.
SOURCES:
Saket
Polyglot programmer - Scala, Python, Java. Golang, Javascript Containerisation - Docker, Kubernetes, Docker Compose. AWS - EKS, ECS, Lambdas, DynamoDB, Networking and Security GCP - BigQuery, DataProc, Dataflow, GKE Data - Spark, Kafka, Kafka Streams, Cassandra, MongoDB Data enthusiast. Privacy advocate. Working and exploring new technologies.
Updated on August 14, 2020Comments
-
Saket over 3 years
I am using the Python high level consumer for Kafka and want to know the latest offsets for each partition of a topic. However I cannot get it to work.
from kafka import TopicPartition from kafka.consumer import KafkaConsumer con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] con.assign(ps) for p in ps: print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) print "Subscription = %s"%con.subscription() print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
But the output I get is
For partition 0 highwater is None For partition 1 highwater is None For partition 2 highwater is None For partition 3 highwater is None For partition 4 highwater is None For partition 5 highwater is None .... For partition 96 highwater is None For partition 97 highwater is None For partition 98 highwater is None For partition 99 highwater is None Subscription = None con.seek_to_beginning() = None con.seek_to_end() = None
I have an alternate approach using
assign
but the result is the samecon = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] con.assign(ps) for p in ps: print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) print "Subscription = %s"%con.subscription() print "con.seek_to_beginning() = %s"%con.seek_to_beginning() print "con.seek_to_end() = %s"%con.seek_to_end()
It seems from some of the documentation that I might get this behaviour if a
fetch
has not been issued. But I cannot find a way to force that. What am I doing wrong?Or is there a different/simpler way to get the latest offsets for a topic?
-
GreenThumb almost 7 yearsIs there a way to get the current/next offset per consumer/group per partition?
-
Nick about 6 yearsMy server has hundreds of messages, yet consumer.poll() returned {}
-
dreynold about 6 yearsSadly, the SimpleClient has been deprecated, and the offsets_responses above yields a FailedPayloadsError: FailedPayloadsError
-
olujedai almost 6 yearsThis could happen if you are running more consumer instances than there are partitions for that topic.
-
Nick almost 6 yearsGood point. I was able to after the fact determine we weren't calling .close, so that very circumstance occurred, but we thought there was only 1.
-
exic over 5 years@dreynold it worked for me, but Itamar Lavender's answer using the non-deprecated parts below works too. If you don't have a group yet, skip the "lag" part and that works as well.
-
Itamar Lavender over 4 yearsAs I see this question still drags attention I wanted to explain while my answer above doesn't really answer the question as to my opinion the last offset of a topic/partition is only relevant in a context of a consumer group. kafka is built for many consumer groups consuming same data from same topics, all I find important is the rate of consumption from a group or more important the lag.
-
DachuanZhao over 3 years
Exception has occurred: TypeError expected cimpl.TopicPartition
-
Giorgos Myrianthous over 3 yearsWhich of the two libraries?
-
DachuanZhao over 3 yearsconfluent_kafka
-
Nikolay Dimitrov over 3 yearsthe simplest solution 👍🏼
-
Giorgos Myrianthous about 3 years@DachuanZhao Which line is causing the issue?
-
Dmitry Pukhov almost 2 yearsIt does not seem to work with SSL