Kafka 0.9 How to re-consume message when manually committing offset with a KafkaConsumer

19,691

Solution 1

If you didn't commit the offset and the auto.commit.enable property is false then when the call to Mongo fails you just wait the time that you think is necessary and retry to poll().

The problem that you are seeing is that the new consumer uses the poll() as a heartbeat mechanism, so if you wait for longer that the timeout request then the coordinator for the topic will kickout the consumer because it will think is dead and it will rebalance the group. So wait for mongo but you may want to poll() ones in a while.

EDIT: As a workaround you can put this property higher request.timeout.ms

Hope it helps!

Solution 2

Here is my code using client version 0.10.0 .

Seem is ok for you demand.

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageProcesser implements Runnable {

    private static Logger logger = LoggerFactory.getLogger(MessageProcesser.class);

    private final ExecutorService pool = Executors.newFixedThreadPool(4);

    private final KafkaConsumer<String, String> consumer;

    private final String topic;

    private final AtomicBoolean closed = new AtomicBoolean(false);

    public MessageProcesser(String groupId, String topic, String kafkaServer) {
        this.topic = topic;
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaServer);
        props.put("group.id", groupId);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("enable.auto.commit", "false");
        this.consumer = new KafkaConsumer<>(props);
    }

    @Override
    public void run() {
        try {

            consumer.subscribe(Collections.singleton(topic));

            while (true) {
                if (closed.get()) {
                    consumer.close();
                }

                ConsumerRecords<String, String> records = consumer.poll(1000 * 60); 
                for (ConsumerRecord<String, String> record : records) {

                    String value = record.value();
                    if (null == value) {
                        continue;
                    }

                    boolean processResult = false;
                    try {
                        Future<Object> f = pool.submit(new ProcessCommand(value));
                        processResult = (boolean) f.get(100, TimeUnit.MILLISECONDS);
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }

                    if (!processResult) {
                        //here if process fail, seek to current offset
                        consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
                    } else {
                        this.commitAsyncOffset(record);
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            if (!closed.get()) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e1) {
                    // ignore
                }
            }
        }
    }

    public void shutdown() {
        closed.set(true);
    }

    public void commitAsyncOffset(ConsumerRecord<String, String> record) {
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));

        consumer.commitAsync(offsets, new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
                if (e != null) {
                    logger.error("kafka offset commit fail. {} {}", offsets, PushUtil.getStackString(e.getStackTrace()));
                }
            }
        });
    }
}

Solution 3

as I understand it, the (new) client is the one that keeps the consumed offsets. The commit sends the offsets to the server, but it has no effect on next poll from that client, since the client says to the server "give me next messages on THAT offset". Why is then the offset sent to the server? For next rebalance. So the only situation server uses the committed offsets is when some client dies/disconnects - then the partitions are rebalanced and with this rebalances the clients get the offsets from the server.

So if you don't commit offset and then call poll(), you cannot expect that the message will be read again. To this there would have to be a possibility to rollback the offset in the client. I didn't try but I think calling KafkaConsumer.seek to the offset of failed message should do the trick.

https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)

BTW, in this way you can even commit the last successfuly processed message and seek to the first failed, so that you don't need to repeat the whole record list, when failure occured for some message in the middle of it.

Share:
19,691
Michael Freeman
Author by

Michael Freeman

Updated on June 08, 2022

Comments

  • Michael Freeman
    Michael Freeman almost 2 years

    I am writing a consumer that manually commits the offset once a series of records are commited to Mongo.
    In the case of a Mongo error or any other error an attempt is made to persit the record to an error processing collection for replay at a later date. If Mongo is down then I want the consumer to stop processing for a period of time before trying to read the records from the uncommited offset from Kakfa.
    The below sample works but I would like to know what the best practice for this scenario is?

    while (true) {
        boolean commit = false;
        try {
            ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
            kafkaMessageProcessor.processRecords(records);
            commit = true;
        }
        catch (Exception e) {
            logger.error("Unable to consume closing consumer and restarting", e);
            try {
               consumer.close();
            }
            catch (Exception consumerCloseError) {
                logger.error("Unable to close consumer", consumerCloseError);
            }
            logger.error(String.format("Attempting recovery in  [%d] milliseconds.", recoveryInterval), e);
            Thread.sleep(recoveryInterval);
            consumer = createConsumer(properties);
        }
        if (commit) {
            consumer.commitSync();
        }
    
    }
    
    private KafkaConsumer<K, V> createConsumer(Properties properties) {
        KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
        consumer.subscribe(topics);
        return consumer;
    }
    

    If I don't recreate the consumer I get the following error.

    o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator 2147483647 dead.
    o.a.k.c.c.internals.ConsumerCoordinator  : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer
    
  • Michael Freeman
    Michael Freeman over 8 years
    Thanks for your help. That solved the second issue of my consumer getting the boot. In order to reprocess the message instead of recreating the consumer consumer.seekToBeginning() can be called instead.
  • Nautilus
    Nautilus over 8 years
    consumer.seekToBeginning(partitions) will reset the offset to the first position in all partitions that you send. I don't see how this helps in your use case, if you reset to the begging you will have to reprocess all events.
  • Michael Freeman
    Michael Freeman over 8 years
    It will reprocess all events from the last offset commit. Is this assumption incorrect? I want to keep attempting to reprocess until Mongo is available again. Without this the poll just consumes the next message.
  • Nautilus
    Nautilus over 8 years
    if you poll the messages and you don't commit the offset, then the next time that you poll you will (possibly) get the same messages (if auto.commit.offset is false) If you do consumer.seekToBeginning(partitions) it will go to the earliest message that your partition has and will start consuming from there.
  • Michael Freeman
    Michael Freeman over 8 years
    Even though I don't commit and have auto.commit.offset set to false the next and subsequent polls don't get the uncommitted messages. Maybe its a defect. If I restarted the application they will be consumed again.
  • Nautilus
    Nautilus over 8 years
    Sorry, you are right there is a current position that is maintain that will prevent you from consuming the messages again, my bad. What you can do is keep the offset of the last message that you commited and when you want to start processing again do seek(offset+1)
  • Michael Freeman
    Michael Freeman over 8 years
    Thanks for your help. When I get the records I'll process by partition and maintain the offset. I'll take a look at seekToBeginning as I think this does what I'm looking for (when you give no partitions it appears to just reset the ones the consumer is subscribed to. It does not replay uncommitted messages for me)
  • Jakub
    Jakub about 8 years
    I think you are wrong when you say that without commit next poll() retrieves the message again - see my answer stackoverflow.com/a/36886424/155708
  • Nautilus
    Nautilus about 8 years
    @Jakub I already say that Michael Freeman was right a few comments above.
  • Jakub
    Jakub about 8 years
    Hm, sorry for disruption. Stackoverflow shows only few first comments, the rest is collapsed and I overlooked it.
  • Aleksandar Stojadinovic
    Aleksandar Stojadinovic over 7 years
    Sir,is the seek necessary?
  • Hlex
    Hlex over 7 years
    yes, seek is nesscessory. Java client remember current offset.
  • Hlex
    Hlex over 7 years
    This code has problem if records are multiple and you get an error. you should not process the later record with same partition.
  • Marc Sigrist
    Marc Sigrist over 6 years
    With newer versions, (e.g., Confluent.Kafka 0.11), you can configure the heartbeat.interval.ms parameter to prevent the coordinator from abandoning the consumer.
  • nir
    nir about 5 years
    @MichaelFreeman blog.pragmatists.com/… could be a good read to handle reprocessing. It's particularly helpful when you have mutliple consumers each processing records in async manner
  • Tiago Medici
    Tiago Medici over 4 years
    is there a way to commit single message (consumerRecord) ? in my case once i commit a record , the entire partition of it's record gets commited !!!
  • alex
    alex almost 4 years
    How does using heartbeat.interval.ms have any relevance to retrieving uncommitted messages on subsequent calls to poll?