How to get message from a kafka topic with a specific offset
10,549
Partition-offset based consumption is supported in the new consumer only.
kafka-console-consumer
should use --bootstrap-server
, as the warning mentioned.
And you are missing a space before --partition
But otherwise, --partition x --offset y
is correct.
Full command
kafka-console-consumer \
--bootstrap-server kafka0:9092 \
--topic lopet.lo.pm \
--partition 0 \
--offset 34537263 \
--max-messages 1
Using kcat
is another option, if you want to install it
Related videos on Youtube
Author by
Judy
Updated on September 16, 2022Comments
-
Judy over 1 year
We have an HDP cluster with 3 kafka brokers ( from hortonworks )
We want to run kafka console consumer in order to get one message from topic with specific offset
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper zoo01:2181 --topic lopet.lo.pm--partition 0 --offset 34537263 --max-messages 1
But we get the following:
Where we are wrong?
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. Partition-offset based consumption is supported in the new consumer only. Option Description ------ ----------- --blacklist <blacklist> Blacklist of topics to exclude from consumption. --bootstrap-server <server to connect REQUIRED (unless old consumer is to> used): The server to connect to. --consumer-property <consumer_prop> A mechanism to pass user-defined properties in the form key=value to the consumer. --consumer.config <config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config. --csv-reporter-enabled If set, the CSV metrics reporter will be enabled --delete-consumer-offsets If specified, the consumer path in zookeeper is deleted when starting up --enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) --formatter <class> The name of a class to use for formatting kafka messages for display. (default: kafka.tools. DefaultMessageFormatter) --from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message. --key-deserializer <deserializer for key> --max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual. --metrics-dir <metrics directory> If csv-reporter-enable is set, and this parameter isset, the csv metrics will be outputed here --new-consumer Use the new consumer implementation. This is the default. --offset <consume offset> The offset id to consume from (a non- negative number), or 'earliest' which means from beginning, or 'latest' which means from end (default: latest)
-
Sivaram Rasathurai over 2 yearsNice Answer, Is it possible with Spring Kafka?
-
OneCricketeer over 2 years@RCv I'm not sure I understand what you're asking. You can seek any consumer to any offset, and consume a limited number of messages from that partition, regardless of frameworks
-
Sivaram Rasathurai about 2 yearsThanks, @One, Yeah My question is not really focused. My question was we can consume the messages using the spring Kafka active listeners. Is it possible to get the specific message of Kafka using this spring Kafka library?.
-
OneCricketeer about 2 yearsThis isn't using Spring, though, you only need
kafka-clients.jar
. Plus, you should still be able to seek after using subscribe method; you just don't have guaranteed access to a specific partition -
Sivaram Rasathurai about 2 yearsYeah, the consumer from client jar but, I used spring Kafka consumer factory to create a consumer. and I extend my class with AbstractSeekConsumerAware class because it takes care of much of the underlying complexity. In that case, this consumer is a product of spring configurations ryt. That's why I said, This is for spring java users. For more details, you can check my repo
-
Sivaram Rasathurai about 2 yearsI didn't get your last point. @OneCricketeer. If you provide some more details or some references then that would be great for me. Thanks
-
OneCricketeer about 2 years1) AFAICT, the factory is pointless since you're only using it to construct a producer with a hard coded property map, and you never set/override the AbstractSeekConsumerAware methods in the code 2) You say "we have to
.assign
the consumer. That's true for getting a specific record in a partition, sure, but it's not true to just get any single record, which can be done with.subscribe
-
Sivaram Rasathurai about 2 yearsThanks OneCricketeer, I got your points but not sure about that. Once I checked, i will update my answer. Thanks a lot