Kafka partition Lag increasing
If consumer offset is not moving after some time, then consumer is likely to have stopped. If consumer offset is moving, but consumer lag (difference between the end of the log and the consumer offset) is increasing, the consumer is slower than the producer. If the consumer is slow, the typical solution is to increase the degree of parallelism in the consumer. This may require increasing the number of partitions of a topic.
Read more at the Kafka docs.
To put it simply; you're producing more than you're consuming. You need to increase the rate of consumption to reduce the lag. You need to add more consumers. If you're just testing, then your consumer is slow.
Related videos on Youtube
Comments
-
ashdnik almost 2 years
I have an application which uses Kafka 1.0 as a queue. The Kafka topic has 80 partitions and 80 consumers running. (Kafka-python consumers).
By running the command :
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
I see that one of the partitions is stuck at an offset, and the lag continuously increases as new records are added to it.
The output of the above command looks something like this:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST 118 mytopic 37 1924 2782 858 kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost 119 mytopic 38 2741 2742 1 kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost 120 mytopic 39 2713 2713 0 kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost 121 mytopic 40 2687 2688 1 kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost
What causes this? Also resetting offset using reset-offsets command is not desirable as this server might not be manually monitored on a regular basis.
Clients run in background as parallel processes in a Linux m/c:
consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092', session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1, auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024, value_deserializer=lambda m: json.loads(m.decode('ascii'))) for message in consumer: msg = json.loads(message.value) process_message(msg)
-
ashdnik over 6 yearsThe consumer is a kafka-python client running in the background. Any reason why it might stop abruptly. I checked the number of client instances and it seems fine. Restarting the consumers also doesn't seem to solve the issue.
-
Flair over 2 yearsIf you have a new question, please ask it by clicking the Ask Question button. Include a link to this question if it helps provide context.