kafka consumer group is rebalancing

21,718

The CommitFailedException is thrown when the commit cannot be completed because the group has been rebalanced. This is the main thing we have to be careful of when using the Java client. Since all network IO (including heartbeating) and message processing is done in the foreground, it is possible for the session timeout to expire while a batch of messages is being processed. To handle this, you have two choices.

First you can adjust the session.timeout.ms setting to ensure that the handler has enough time to finish processing messages. You can then tune max.partition.fetch.bytes to limit the amount of data returned in a single batch, though you will have to consider how many partitions are in the subscribed topics.

The second option is to do message processing in a separate thread, but you will have to manage flow control to ensure that the threads can keep up.

You can set session.timeout.ms large enough that commit failures from rebalances are rare.The only drawback to this is a longer delay before partitions can be re-assigned in the event of a hard failure.

For more info please see doc

This is working example.

----Worker code-----

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

public class Worker implements Callable<Boolean> {

    ConsumerRecord record;

    public Worker(ConsumerRecord record) {
        this.record = record;
    }

    public Boolean call() {

        Map<String, Object> data = new HashMap<>();
        try {
            data.put("partition", record.partition());
            data.put("offset", record.offset());
            data.put("value", record.value());
            Thread.sleep(10000);
            System.out.println("Processing Thread---" + Thread.currentThread().getName() + " data:  " + data);
            return Boolean.TRUE;
        } catch (Exception e) {
            e.printStackTrace();
            return Boolean.FALSE;
        }


    }


}

---------Execution code------------------

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;
import java.util.concurrent.*;

public class AsyncConsumer {



    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
       props.put("enable.auto.commit", false);
       props.put("session.timeout.ms", 30000);
       props.put("heartbeat.interval.ms", 10000);
       props.put("request.timeout.ms", 31000);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("Test1", "Test2"));
        int poolSize=10;
        ExecutorService es= Executors.newFixedThreadPool(poolSize);
        CompletionService<Boolean> completionService=new ExecutorCompletionService<Boolean>(es);

        try {

            while (true) {
                System.out.println("Polling................");
                ConsumerRecords<String, String> records = consumer.poll(1000);
                List<ConsumerRecord> recordList = new ArrayList();
                for (ConsumerRecord<String, String> record : records) {
                    recordList.add(record);
                    if(recordList.size() ==poolSize){
                       int taskCount=poolSize;
                     //process it
                        recordList.forEach( recordTobeProcess -> completionService.submit(new Worker(recordTobeProcess)));
                     while(taskCount >0){
                         try {
                             Future<Boolean> futureResult = completionService.poll(1, TimeUnit.SECONDS);
                             if (futureResult != null) {
                                 boolean result = futureResult.get().booleanValue();
                                 taskCount = taskCount - 1;
                             }
                         }catch (Exception e) {
                             e.printStackTrace();
                         }
                     }
                        recordList.clear();
                       Map<TopicPartition,OffsetAndMetadata> commitOffset= Collections.singletonMap(new TopicPartition(record.topic(),record.partition()),
                               new OffsetAndMetadata(record.offset() + 1));
                        consumer.commitSync(commitOffset);
                    }
                }
            }
        } finally {
            consumer.close();
        }


    }
}

You need to follow some rule like:

1) You need to pass fixed number of record(for example 10) to  ConsumeEventInThread.

2) Create more thread for processing instead of one thread and submit all task on completionservice.

3) poll all submitted task and verify.

4) then commit(should use parametric commitSync method instead of non parametric).
Share:
21,718
Sunny Gupta
Author by

Sunny Gupta

Updated on July 09, 2022

Comments

  • Sunny Gupta
    Sunny Gupta almost 2 years

    I am using Kafka .9 and new java consumer. I am polling inside a loop. I am getting commitfailedexcption because of group rebalance, when code try to execute consumer.commitSycn . Please note, I am adding session.timeout.ms as 30000 and heartbeat.interval.ms as 10000 to consumer and polling happens for sure in 30000. Can anyone help me out. Please let me know if any information is needed.

    Here is the code :-

           Properties props = new Properties();
            props.put("bootstrap.servers", {allthreeservers});
            props.put("group.id", groupId);
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", ObjectSerializer.class.getName());
            props.put("auto.offset.reset", erlierst);
            props.put("enable.auto.commit", false);
            props.put("session.timeout.ms", 30000);
            props.put("heartbeat.interval.ms", 10000);
            props.put("request.timeout.ms", 31000);
            props.put("kafka.consumer.topic.name", topic);
            props.put("max.partition.fetch.bytes", 1000);   
    
            while (true) {
                Boolean isPassed = true;
                try {
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    if (records.count() > 0) {
                            ConsumeEventInThread consumerEventInThread = new ConsumeEventInThread(records, consumerService);
                            FutureTask<Boolean> futureTask = new FutureTask<>(consumerEventInThread);
                            executorServiceForAsyncKafkaEventProcessing.execute(futureTask);
                            try {
                                isPassed = (Boolean) futureTask.get(Long.parseLong(props.getProperty("session.timeout.ms")) - Long.parseLong("5000"), TimeUnit.MILLISECONDS);
                            } catch (Exception Exception) {
                                logger.warn("Time out after waiting till session time out");
                            }
                            consumer.commitSync();
                            logger.info("Successfully committed offset for topic " + Arrays.asList(props.getProperty("kafka.consumer.topic.name")));
                        }else{
                            logger.info("Failed to process consumed messages, will not Commit and consume again");
                        }
    
                    }
    
                } catch (Exception e) {
                    logger.error("Unhandled exception in while consuming messages " + Arrays.asList(props.getProperty("kafka.consumer.topic.name")), e);
                }
            }
    
  • Sunny Gupta
    Sunny Gupta almost 8 years
    Hector, Completely agree with you, The problem is, i am doing exactly as you said, Having sperate thread, using max.fatch,bytes, Still 'poll' method is not able to send heart beat. so that at next commitsync i am getting rebalancing issue, Please see the code in the question, i have added.
  • Sunny Gupta
    Sunny Gupta almost 8 years
    I am trying to figure out what is going on but no luck, can you suggest some check points.
  • Sky
    Sky almost 8 years
    Problem is here=> futureTask.get(Long.parseLong(props.getProperty("session.tim‌​eout.ms")) - Long.parseLong("5000"), TimeUnit.MILLISECONDS);
  • Sky
    Sky almost 8 years
    See updated answer with working example
  • Sunny Gupta
    Sunny Gupta almost 8 years
    Thanks you for reply,In your code we are waiting for all the tasks to get complete before commit right? . But in our case the messages we are consuming can take like 30 mins or more to process, we cannot wait for them to complete beyond session.timeout.ms. That is the reason we were using get method with timeout less then session timeout. If processing fails, we are pusing them again in the same topic to get consumed again.
  • Sky
    Sky almost 8 years
    yes. In this code all messages are executing parallel. Why "session.timeout.ms" is 30000(30 second) if processing taking 30 minute ? You need to set higher session timeout in this case.
  • Sunny Gupta
    Sunny Gupta almost 8 years
    Basically our event processing is taking between 10 sec to 30 mins. we are not even sure that 30 mins are enough. So we are just starting consumption or processing in different thread and committing it in 25 secs ( session.timout.ms - 5sec). the polling works fine for few mins, but after that consumer is rebalancing automatically, surprisingly when i upgraded the kafka to 10, the issue is resolved.