Kafka consumer receiving same message multiple times
I increased my auto.commit.interval.ms=8000
in my consumer properties from 3000
to 8000
. This fixed the duplicate record issues.
Shamik
Updated on June 18, 2022Comments
-
Shamik almost 2 years
I've recently started using kafka to read documents coming through a web crawler. What I'm noticing is when I'm dealing with few million documents, the consumer is processing the same message over and over again. Looks like the data is not getting committed for some reason. This is not the case when I'm testing the consumer with few hundred message.
I'm using kafka high level consumer client code in java. I'm using consumer group running on number of threads equivalent to number of partitions. So each thread is deciated to a partition. Here's a code snippet for polling data.
Here's the kafka consumer configuration parameters I'm setting. Rest are default values.while (true) { try{ if(consumerDao.canPollTopic()){ ConsumerRecords records = consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT)); for (ConsumerRecord record : records) { if(record.value()!=null){ TextAnalysisRequest textAnalysisObj = record.value(); if(textAnalysisObj!=null){ PostProcessRequest req = new PostProcessRequest(); req.setRequest(this.getRequest(textAnalysisObj)); PreProcessorUtil.submitPostProcessRequest(req, config); } } } }else{ Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP)); } }catch(Exception ex){ LOGGER.error("Error in Full Consumer group worker", ex);
} }consumer.auto.commit=true consumer.auto.commit.interval=1000 consumer.session.timeout=180000 consumer.poll.records=2147483647 consumer.request.timeout=181000
Here's the complete consumer config:
My sample kafka queue is having 8 partitions with 2 replication factor.metric.reporters = metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id =ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 181000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class com.test.preprocessor.consumer.serializer.KryoObjectSerializer group.id = full_group retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 180000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest
The log retention period in server.properties is setup as 168 hours.
Not sure what I'm missing here.log.retention.hours=168 log.roll.hours=168