Kafka Consumer poll messages with python

13,326

I believe that you are misunderstanding max_poll_records - this doesn't mean you will get 200 per poll, just a limit on the most you might get. You will need to call poll multiple times. I'd refer you to the docs for simple examples: http://kafka-python.readthedocs.io/en/master/usage.html

I believe a more standard implementation is:

for message in self.consumer:
  # do stuff like:
  print(msg)
Share:
13,326
soa
Author by

soa

Updated on June 09, 2022

Comments

  • soa
    soa almost 2 years

    I have problems with polling messages from Kafka in a Consumer Group. My Consumer Object assigns to a given partition with

    self.ps = TopicPartition(topic, partition )
    

    and after that the consumer assigns to that Partition:

    self.consumer.assign([self.ps])
    

    After that I am able to count the messages inside the partition with

    self.consumer.seek_to_beginning(self.ps)
    pos = self.consumer.position(self.ps)
    

    and self.consumer.seek_to_end(self.ps) .....

    In my topic are over 30000 messages. The problem is that I am only get exactly one message.

    Consumer Configuration with: max_poll_records= 200 AUTO_OFFSET_RESET is earliest

    And here is my function with this I am trying to get the messages:

     def poll_messages(self):
    
    
        data = []
    
        messages = self.consumer.poll(timeout_ms=6000)
    
    
        for partition, msgs in six.iteritems(messages):
    
            for msg in msgs:
    
                data.append(msg)
    
        return data
    

    Even if I go to the first available offset before start polling the messages I get only one message.

    self.consumer.seek(self.ps, self.get_first_offset())
    

    I hope someone can explain me what I am doing wrong. Thanks in advance.

    Best wishes Jörn