Kafka consumer receiving same message multiple times

14,068

I increased my auto.commit.interval.ms=8000 in my consumer properties from 3000 to 8000. This fixed the duplicate record issues.

Share:
14,068
Shamik
Author by

Shamik

Updated on June 18, 2022

Comments

  • Shamik
    Shamik almost 2 years

    I've recently started using kafka to read documents coming through a web crawler. What I'm noticing is when I'm dealing with few million documents, the consumer is processing the same message over and over again. Looks like the data is not getting committed for some reason. This is not the case when I'm testing the consumer with few hundred message.

    I'm using kafka high level consumer client code in java. I'm using consumer group running on number of threads equivalent to number of partitions. So each thread is deciated to a partition. Here's a code snippet for polling data.

    while (true) {
        try{
            if(consumerDao.canPollTopic()){
                ConsumerRecords records = 
                  consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));
                for (ConsumerRecord record : records) {
                    if(record.value()!=null){
                        TextAnalysisRequest textAnalysisObj = record.value();
                        if(textAnalysisObj!=null){
                            PostProcessRequest req = new PostProcessRequest();
                            req.setRequest(this.getRequest(textAnalysisObj));
                            PreProcessorUtil.submitPostProcessRequest(req, config);
                        }
                    }
                }
            }else{
                Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));
            }
        }catch(Exception ex){
            LOGGER.error("Error in Full Consumer group worker", ex);
    } }
    Here's the kafka consumer configuration parameters I'm setting. Rest are default values.

    
    consumer.auto.commit=true
    consumer.auto.commit.interval=1000
    consumer.session.timeout=180000
    consumer.poll.records=2147483647
    consumer.request.timeout=181000
    

    Here's the complete consumer config:

    
    metric.reporters = 
    metadata.max.age.ms = 300000
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
    ssl.keystore.type = JKS
    enable.auto.commit = true
    sasl.mechanism = GSSAPI
    interceptor.classes = null
    exclude.internal.topics = true
    ssl.truststore.password = null
    client.id =ssl.endpoint.identification.algorithm = null
    max.poll.records = 2147483647
    check.crcs = true
    request.timeout.ms = 181000
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 1000
    receive.buffer.bytes = 65536
    ssl.truststore.type = JKS
    ssl.truststore.location = null
    ssl.keystore.password = null
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    value.deserializer = class com.test.preprocessor.consumer.serializer.KryoObjectSerializer
    group.id = full_group
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    session.timeout.ms = 180000
    metrics.num.samples = 2
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    auto.offset.reset = latest
    
    My sample kafka queue is having 8 partitions with 2 replication factor.

    The log retention period in server.properties is setup as 168 hours.

    log.retention.hours=168
    log.roll.hours=168
    
    Not sure what I'm missing here.