kafka-python consumer start reading from offset (automatically)
Solution 1
You are getting this behavior because your consumer is not using a Consumer Group. With a Consumer Group, the consumer will regularly commit (save) its position to Kafka. That way if it's restarted it will pick up from its last committed position.
To make your consumer use a Consumer Group, you need to set group_id
when constructing it.
See group_id
description from the docs:
The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None
For example:
consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', enable_auto_commit=True,
auto_commit_interval_ms=1000, group_id='my-group')
Solution 2
Is that possible to using consumer from different server. I already tried the same below is the code and its not fetching any data from kafka.
consumer = KafkaConsumer('tet', bootstrap_servers=['192.168.1.20:9092'],
auto_offset_reset='earliest', enable_auto_commit=True,
auto_commit_interval_ms=1000, group_id=None)
Note:- When I am giving wrong ip or port number its throws exceptions.
Steven Van Dorpe
Updated on June 17, 2022Comments
-
Steven Van Dorpe almost 2 years
I'm trying to build an application with kafka-python where a consumer reads data from a range of topics. It is extremely important that the consumer never reads the same message twice, but also never misses a message.
Everything seems to be working fine, except when I turn off the consumer (e.g. failure) and try to start reading from offset. I can only read all the messages from the topic (which creates double reads) or listen for new messages only (and miss messages that where emitted during the breakdown). I don't encounter this problem when pausing the consumer.
I created an isolated simulation in order to try to solve the problem.
Here the generic producer:
from time import sleep from json import dumps from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092']) x=0 # set manually to avoid duplicates for e in range(1000): if e <= x: pass else: data = dumps( { 'number' : e } ).encode('utf-8') producer.send('numtest', value=data) print(e, ' send.') sleep(5)
And the consumer. If
auto_offset_reset
is set to'earliest'
, all the messages will be read again. Ifauto_offset_reset
is set to'latest'
, no messages during down-time will be read.from kafka import KafkaConsumer from pymongo import MongoClient from json import loads ## Retrieve data from kafka (WHAT ABOUT MISSED MESSAGES?) consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, auto_commit_interval_ms=1000) ## Connect to database client = MongoClient('localhost:27017') collection = client.counttest.counttest # Send data for message in consumer: message = loads(message.value.decode('utf-8')) collection.insert_one(message) print('{} added to {}'.format(message, collection))
I feel like the auto-commit isn't working properly.
I know that this questions is similar to this one, but I would like a specific solution.
Thanks for helping me out.
-
Steven Van Dorpe over 5 yearsThanks Mickael! Changed my code and it works like a charm now.