Java, How to get number of messages in a topic in apache kafka

185,677

Solution 1

The only way that comes to mind for this from a consumer point of view is to actually consume the messages and count them then.

The Kafka broker exposes JMX counters for number of messages received since start-up but you cannot know how many of them have been purged already.

In most common scenarios, messages in Kafka is best seen as an infinite stream and getting a discrete value of how many that is currently being kept on disk is not relevant. Furthermore things get more complicated when dealing with a cluster of brokers which all have a subset of the messages in a topic.

Solution 2

It is not java, but may be useful

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list <broker>:<port> \
  --topic <topic-name> \
  | awk -F  ":" '{sum += $3} END {print sum}'

Solution 3

Since ConsumerOffsetChecker is no longer supported, you can use this command to check all messages in topic:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --group my-group \
    --bootstrap-server localhost:9092 \
    --describe

Where LAG is the count of messages in topic partition:

enter image description here

Also you can try to use kafkacat. This is an open source project that may help you to read messages from a topic and partition and prints them to stdout. Here is a sample that reads the last 10 messages from sample-kafka-topic topic, then exit:

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e

Solution 4

I actually use this for benchmarking my POC. The item you want to use ConsumerOffsetChecker. You can run it using bash script like below.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

And below is the result : enter image description here As you can see on the red box, 999 is the number of message currently in the topic.

Update: ConsumerOffsetChecker is deprecated since 0.10.0, you may want to start using ConsumerGroupCommand.

Solution 5

Sometimes the interest is in knowing the number of messages in each partition, for example, when testing a custom partitioner.The ensuing steps have been tested to work with Kafka 0.10.2.1-2 from Confluent 3.2. Given a Kafka topic, kt and the following command-line:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

That prints the sample output showing the count of messages in the three partitions:

kt:2:6138
kt:1:6123
kt:0:6137

The number of lines could be more or less depending on the number of partitions for the topic.

Share:
185,677

Related videos on Youtube

Chetan
Author by

Chetan

Software Engineer

Updated on November 26, 2021

Comments

  • Chetan
    Chetan over 2 years

    I am using apache kafka for messaging. I have implemented the producer and consumer in Java. How can we get the number of messages in a topic?

  • kisna
    kisna about 8 years
    Shouldn't this be difference of earliest and latest offset per partition sum? bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609 And then the difference returns actual pending messages in topic? Am I correct?
  • ssemichev
    ssemichev about 8 years
    Yes, that's true. You have to calculate a difference if the earliest offsets do not equal zero.
  • kisna
    kisna about 8 years
    That's what I thought :).
  • Szymon Sadło
    Szymon Sadło over 7 years
    Please note that ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
  • Rudy
    Rudy over 7 years
    Yeah, that's what I said.
  • Szymon Sadło
    Szymon Sadło over 7 years
    Your last sentence is not accurate. The above command still works in 0.10.0.1 and the warning is the same as my previous comment.
  • armandfp
    armandfp about 7 years
    this tool is nice, but if it will not work if your topic has more than 2 dots.
  • salvob
    salvob about 7 years
    Is there ANY way to use that as an API and so inside a code (JAVA, Scala or Python)?
  • ssemichev
    ssemichev about 7 years
    Here is a mix of my code and code from Kafka. It may be useful. I used it for Spark streaming - Kafka integration KafkaClient gist.github.com/ssemichev/c2d94dce7ad65339c9637e1b461f86cf KafkaCluster gist.github.com/ssemichev/fa3605c7b10cb6c7b9c8ab54ffbc5865
  • AutomatedMike
    AutomatedMike almost 7 years
    btw, if you have compaction turned on then there may be gaps in the stream so the actual number of messages may be lower than the total calculated here. To get an accurate total you're going to have to replay the messages and count them.
  • Christophe Quintard
    Christophe Quintard almost 6 years
    See my answer stackoverflow.com/a/47313863/2017567. The Java Kafka client allows to get that information.
  • Admin
    Admin about 5 years
    If log compaction is enabled, then summing the offsets of the partitions may not give the exact count of messages in the topic.
  • WestCoastProjects
    WestCoastProjects about 5 years
    Note: I removed the --new-consumer since that option is no longer available (or apparently necessary)
  • Vip
    Vip almost 5 years
    Can somebody help me find out how can i pass SASL config in this command?
  • spats
    spats over 3 years
    simplifying @kisna answer to exact record count : brokers="<broker1:port>" topic=<topic-name> sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}') echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))
  • adaslaw
    adaslaw over 3 years
    I prefer you answer comparing to @AutomatedMike answer since your answer doesn't mess with seekToEnd(..) and seekToBeginning(..) methods which change the state of the consumer.
  • so-random-dude
    so-random-dude about 3 years
    careful if its compacted topic
  • Felipe Correa
    Felipe Correa over 2 years
    This answer is lacking a bit in precision. LAG is the amount of messages that are pending to be consumed by a consumer. Is not the total of messages in the partition. A value a bit more accurate for the TOTAL of messages in the partitions (but still somewhat misleading) would be LOG-END-OFFSET.
  • Florin Andrei
    Florin Andrei over 2 years
    Let me see if I get this right: Enable JMX. Get all metrics. Pick a topic and a partition. For that topic/partition combo, get LogEndOffset and LogStartOffset. Do the difference. That's the number of messages in the queue. Correct?
  • Florin Andrei
    Florin Andrei over 2 years
    If a topic has multiple partitions, then I need to do this math separately for each partition? Then add the results? (I'm new to Kafka, I've only used RabbitMQ before.)