kafka-python consumer start reading from offset (automatically)

19,436

Solution 1

You are getting this behavior because your consumer is not using a Consumer Group. With a Consumer Group, the consumer will regularly commit (save) its position to Kafka. That way if it's restarted it will pick up from its last committed position.

To make your consumer use a Consumer Group, you need to set group_id when constructing it. See group_id description from the docs:

The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None

For example:

consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', enable_auto_commit=True,
                         auto_commit_interval_ms=1000, group_id='my-group')

Solution 2

Is that possible to using consumer from different server. I already tried the same below is the code and its not fetching any data from kafka.

consumer = KafkaConsumer('tet', bootstrap_servers=['192.168.1.20:9092'],
                     auto_offset_reset='earliest', enable_auto_commit=True,
                     auto_commit_interval_ms=1000, group_id=None)

Note:- When I am giving wrong ip or port number its throws exceptions.

Share:
19,436
Steven Van Dorpe
Author by

Steven Van Dorpe

Updated on June 17, 2022

Comments

  • Steven Van Dorpe
    Steven Van Dorpe almost 2 years

    I'm trying to build an application with kafka-python where a consumer reads data from a range of topics. It is extremely important that the consumer never reads the same message twice, but also never misses a message.

    Everything seems to be working fine, except when I turn off the consumer (e.g. failure) and try to start reading from offset. I can only read all the messages from the topic (which creates double reads) or listen for new messages only (and miss messages that where emitted during the breakdown). I don't encounter this problem when pausing the consumer.

    I created an isolated simulation in order to try to solve the problem.

    Here the generic producer:

    from time import sleep
    from json import dumps
    from kafka import KafkaProducer
    
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    
    x=0 # set manually to avoid duplicates 
    
    for e in range(1000):
        if e <= x:
            pass
        else:
            data = dumps(
                {
                'number' : e
            }
            ).encode('utf-8')
    
            producer.send('numtest', value=data)
            print(e, ' send.')
    
            sleep(5)
    

    And the consumer. If auto_offset_reset is set to 'earliest', all the messages will be read again. If auto_offset_reset is set to 'latest', no messages during down-time will be read.

    from kafka import KafkaConsumer
    from pymongo import MongoClient
    from json import loads
    
    ## Retrieve data from kafka (WHAT ABOUT MISSED MESSAGES?)
    consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
                             auto_offset_reset='earliest', enable_auto_commit=True,
                             auto_commit_interval_ms=1000)
    
    
    ## Connect to database
    client = MongoClient('localhost:27017')
    collection = client.counttest.counttest
    
    # Send data
    for message in consumer:
        message = loads(message.value.decode('utf-8'))
        collection.insert_one(message)
        print('{} added to {}'.format(message, collection))
    

    I feel like the auto-commit isn't working properly.

    I know that this questions is similar to this one, but I would like a specific solution.

    Thanks for helping me out.

  • Steven Van Dorpe
    Steven Van Dorpe over 5 years
    Thanks Mickael! Changed my code and it works like a charm now.