kafka-python read from last produced message after a consumer restart

11,188

Solution 1

You will not to seekToEnd() to the end of the log.

Keep in mind, that you first need to subscribe to a topic before you can seek. Also, subscribing is lazy. Thus, you will need to add a "dummy poll" before you can seek, too.

consumer.subscribe(...)
consumer.poll() // dummy poll
consumer.seekToEnd()

// now enter your regular poll-loop

Solution 2

Thanks,

it works!

This is a simplified versione of my code:

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
#dummy poll
consumer.poll()
#go to end of the stream
consumer.seek_to_end()
#start iterate
for message in consumer:
    print(message)

consumer.close()

The documentation states that the poll() method is incompatible with the iterator interface, which i guess is the the one I use in the loop at the end of my script. However from initial testing, this code looks like to work correctly.

Is it safe to use it? Or did I misunderstood the docuementation?

Thanks

Solution 3

In response to your question in your answer:

It is my understanding that when you execute consumer.poll() a dictionary is returned. So, when I wanted to poll for information I used a loop to walk through the dictionary.

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
messages = consumer.poll()
data = []
for msg in messages:
    for value in messages[msg]:
       #Add just the values to the list
       data.append(value[6])

I believe what you are doing is getting the iterator with consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True) and then walking the iterator with

#start iterate
for message in consumer:
    print(message)

It doesn't look like you are actually getting just the 500 results from the poll. You can confirm this by adding max_poll_records=5 to your KafkaConsumer configuration. Then when you run the code, if more than 5 messages print out you can tell that you aren't using the poll functionality.

Hope that helps!

Solution 4

Here is a convenient way to have all messages returned by a poll in a list:

while True:
  messages = [] # Store all messages
  crs = [] # Store all consumer records
  tpd = consumer.poll(timeout_ms=60000, max_records=1)
  [ crs.extend(tp) for tp in tpd.values() ] # List of cr's
  [ messages.extend([json.loads(cr.value)]) for cr in crs ]
  print messages
Share:
11,188
ugomaria
Author by

ugomaria

Updated on June 09, 2022

Comments

  • ugomaria
    ugomaria almost 2 years

    i am using kafka-python to consume messages from a kafka queue (kafka version 0.10.2.0). In particular i am using KafkaConsumer type. If the consumer stops and after a while it is restarted i would like to restart from the latest produced message, that is drop all the messages produced during the time the consumer was down. How can i achieve this?

    Thanks