Kafka Consumer poll messages with python
I believe that you are misunderstanding max_poll_records - this doesn't mean you will get 200 per poll, just a limit on the most you might get. You will need to call poll multiple times. I'd refer you to the docs for simple examples: http://kafka-python.readthedocs.io/en/master/usage.html
I believe a more standard implementation is:
for message in self.consumer:
# do stuff like:
print(msg)
soa
Updated on June 09, 2022Comments
-
soa almost 2 years
I have problems with polling messages from Kafka in a Consumer Group. My Consumer Object assigns to a given partition with
self.ps = TopicPartition(topic, partition )
and after that the consumer assigns to that Partition:
self.consumer.assign([self.ps])
After that I am able to count the messages inside the partition with
self.consumer.seek_to_beginning(self.ps) pos = self.consumer.position(self.ps)
and
self.consumer.seek_to_end(self.ps)
.....In my topic are over 30000 messages. The problem is that I am only get exactly one message.
Consumer Configuration with:
max_poll_records= 200
AUTO_OFFSET_RESET
is earliestAnd here is my function with this I am trying to get the messages:
def poll_messages(self): data = [] messages = self.consumer.poll(timeout_ms=6000) for partition, msgs in six.iteritems(messages): for msg in msgs: data.append(msg) return data
Even if I go to the first available offset before start polling the messages I get only one message.
self.consumer.seek(self.ps, self.get_first_offset())
I hope someone can explain me what I am doing wrong. Thanks in advance.
Best wishes Jörn