How to get message from a kafka topic with a specific offset

10,549

Partition-offset based consumption is supported in the new consumer only.

kafka-console-consumer should use --bootstrap-server, as the warning mentioned.

And you are missing a space before --partition

But otherwise, --partition x --offset y is correct.


Full command

kafka-console-consumer \
  --bootstrap-server kafka0:9092 \
  --topic lopet.lo.pm \
  --partition 0 \
  --offset 34537263 \
  --max-messages 1

Using kcat is another option, if you want to install it

Share:
10,549

Related videos on Youtube

Judy
Author by

Judy

Updated on September 16, 2022

Comments

  • Judy
    Judy over 1 year

    We have an HDP cluster with 3 kafka brokers ( from hortonworks )

    We want to run kafka console consumer in order to get one message from topic with specific offset

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper zoo01:2181  --topic lopet.lo.pm--partition 0 --offset 34537263 --max-messages 1
    

    But we get the following:

    Where we are wrong?

    Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    Partition-offset based consumption is supported in the new consumer only.
    Option                                   Description
    ------                                   -----------
    --blacklist <blacklist>                  Blacklist of topics to exclude from
                                               consumption.
    --bootstrap-server <server to connect    REQUIRED (unless old consumer is
      to>                                      used): The server to connect to.
    --consumer-property <consumer_prop>      A mechanism to pass user-defined
                                               properties in the form key=value to
                                               the consumer.
    --consumer.config <config file>          Consumer config properties file. Note
                                               that [consumer-property] takes
                                               precedence over this config.
    --csv-reporter-enabled                   If set, the CSV metrics reporter will
                                               be enabled
    --delete-consumer-offsets                If specified, the consumer path in
                                               zookeeper is deleted when starting up
    --enable-systest-events                  Log lifecycle events of the consumer
                                               in addition to logging consumed
                                               messages. (This is specific for
                                               system tests.)
    --formatter <class>                      The name of a class to use for
                                               formatting kafka messages for
                                               display. (default: kafka.tools.
                                               DefaultMessageFormatter)
    --from-beginning                         If the consumer does not already have
                                               an established offset to consume
                                               from, start with the earliest
                                               message present in the log rather
                                               than the latest message.
    --key-deserializer <deserializer for
      key>
    --max-messages <Integer: num_messages>   The maximum number of messages to
                                               consume before exiting. If not set,
                                               consumption is continual.
    --metrics-dir <metrics directory>        If csv-reporter-enable is set, and
                                               this parameter isset, the csv
                                               metrics will be outputed here
    --new-consumer                           Use the new consumer implementation.
                                               This is the default.
    --offset <consume offset>                The offset id to consume from (a non-
                                               negative number), or 'earliest'
                                               which means from beginning, or
                                               'latest' which means from end
                                               (default: latest)
    
  • Sivaram Rasathurai
    Sivaram Rasathurai over 2 years
    Nice Answer, Is it possible with Spring Kafka?
  • OneCricketeer
    OneCricketeer over 2 years
    @RCv I'm not sure I understand what you're asking. You can seek any consumer to any offset, and consume a limited number of messages from that partition, regardless of frameworks
  • Sivaram Rasathurai
    Sivaram Rasathurai about 2 years
    Thanks, @One, Yeah My question is not really focused. My question was we can consume the messages using the spring Kafka active listeners. Is it possible to get the specific message of Kafka using this spring Kafka library?.
  • OneCricketeer
    OneCricketeer about 2 years
    This isn't using Spring, though, you only need kafka-clients.jar. Plus, you should still be able to seek after using subscribe method; you just don't have guaranteed access to a specific partition
  • Sivaram Rasathurai
    Sivaram Rasathurai about 2 years
    Yeah, the consumer from client jar but, I used spring Kafka consumer factory to create a consumer. and I extend my class with AbstractSeekConsumerAware class because it takes care of much of the underlying complexity. In that case, this consumer is a product of spring configurations ryt. That's why I said, This is for spring java users. For more details, you can check my repo
  • Sivaram Rasathurai
    Sivaram Rasathurai about 2 years
    I didn't get your last point. @OneCricketeer. If you provide some more details or some references then that would be great for me. Thanks
  • OneCricketeer
    OneCricketeer about 2 years
    1) AFAICT, the factory is pointless since you're only using it to construct a producer with a hard coded property map, and you never set/override the AbstractSeekConsumerAware methods in the code 2) You say "we have to .assign the consumer. That's true for getting a specific record in a partition, sure, but it's not true to just get any single record, which can be done with .subscribe
  • Sivaram Rasathurai
    Sivaram Rasathurai about 2 years
    Thanks OneCricketeer, I got your points but not sure about that. Once I checked, i will update my answer. Thanks a lot