Delay in Consumer consuming messages in Apache Kafka
Try to add
props.put("request.required.acks", "1")
to producer configuration. By default producer doesn't wait for acks and message delivery is not guaranteed. So, if you start broker just before your test, producer may start to send messages before broker is fully initialized and first several messages may be lost.Try to add
props.put("auto.offset.reset", "smallest")
to consumer configuration. It is equal to--from-beginning
option of kafka-console-consumer.sh. If your consumer starts later than producer and there is no offset data saved in Zookeeper, then by default it will start consuming only new messages (see Consumer configs in docs).
Related videos on Youtube
Ankita
Aspiring Data Scientist and an enthusiast for niche Big Data Technologies!
Updated on September 15, 2022Comments
-
Ankita over 1 year
I am using Kafka 0.8.0 and trying to achieve the below mentioned scenario.
JCA API (Acts as a producer and sends data to)-----> Consumer------> HBase
I am sending each message to consumer as soon as I fetch the data using JCA Client. For instance, as soon as producer sends message no.1 , I want to fetch the same from consumer and 'put' in HBase. But my consumer starts fetching the messages after some random n messages . I want to put the producer and consumer in sync so that both of them start working together.
I have used:
1 broker
1 single topic
1 single producer and high level Consumer
Can anyone suggest what do i need to do to achieve the same?
EDITED:
Adding some relevant code snippet.
Consumer.java
public class Consumer extends Thread { private final ConsumerConnector consumer; private final String topic; PrintWriter pw = null; int t = 0; StringDecoder kd = new StringDecoder(null); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); Map<String, List<KafkaStream<String, Signal>>> consumerMap; KafkaStream<String, Signal> stream; ConsumerIterator<String, Signal> it; public Consumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; topicCountMap.put(topic, new Integer(1)); consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer( new VerifiableProperties())); stream = consumerMap.get(topic).get(0); it = stream.iterator(); } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id", KafkaProperties.groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.size", "1024"); return new ConsumerConfig(props); } synchronized public void run() { while (it.hasNext()) { t = (it.next().message()).getChannelid(); System.out.println("In Consumer received msg" + t); } } }
producer.java
public class Producer { public final kafka.javaapi.producer.Producer<String, Signal> producer; private final String topic; private final Properties props = new Properties(); public Producer(String topic) { props.put("serializer.class", "org.bigdata.kafka.Serializer"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "localhost:9092"); // Use random partitioner. Don't need the key type. Just set it to Integer. // The message is of type userdefined Object . producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props)); this.topic = topic; } }
KafkaProperties.java
public interface KafkaProperties { final static String zkConnect = "127.0.0.1:2181"; final static String groupId = "group1"; final static String topic = "test00"; final static String kafkaServerURL = "localhost"; final static int kafkaServerPort = 9092; final static int kafkaProducerBufferSize = 64 * 1024; final static int connectionTimeOut = 100000; final static int reconnectInterval = 10000; final static String clientId = "SimpleConsumerDemoClient"; }
This is how the consumer is behaving for the first 10 messages it does not sysout that message received by consumer but from the 11th message onwards it starts functioning correctly.
producer sending msg1 producer sending msg2 producer sending msg3 producer sending msg4 producer sending msg5 producer sending msg6 producer sending msg7 producer sending msg8 producer sending msg9 producer sending msg10 producer sending msg11 producer sending msg12 In Consumer received msg12 producer sending msg13 In Consumer received msg13 producer sending msg14 In Consumer received msg14 producer sending msg15 In Consumer received msg15 producer sending msg16 In Consumer received msg16 producer sending msg17 In Consumer received msg17 producer sending msg18 In Consumer received msg18 producer sending msg19 In Consumer received msg19 producer sending msg20 In Consumer received msg20 producer sending msg21 In Consumer received msg21
EDITED: adding the listener function where producer is sending messages to consumer. And I am using the default producer config did not overwrite it
public synchronized void onValueChanged(final MonitorEvent event_) { // Get the value from the DBR try { final DBR dbr = event_.getDBR(); final String[] val = (String[]) dbr.getValue(); producer1.producer.send(new KeyedMessage<String, Signal> (KafkaProperties.topic,new Signal(messageNo))); System.out.println("producer sending msg"+messageNo); messageNo++; } catch (Exception ex) { ex.printStackTrace(); } }
-
Ankita about 10 yearsThanks for suggestion.Added the props.put("request.required.acks", "1") to producer but the program is behaving in a random manner. I ran the program 5 times each time with a new topic. But it gave different results all 5 times.Twice the producer and consumer were in sync rest of the times the consumer was delayed.
-
Dmitry about 10 yearsBy 'delayed' you mean all messages were received, but not immediately after sending? In your original output first several messages were completely lost.
-
Ankita about 10 yearsYes,actually there are two scenarios : 1) Sometimes all messages were received, but not immediately after sending. 2) The other times several messages were lost as illustrated in the output provided. But when i run this command "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicname --from-beginning" from console,I get the same number of messages in consumer as produced by producer. Why is that happening??
-
Dmitry about 10 yearsAdded another suggestion to the answer.
-
Dmitry about 10 yearsTry to call consumer.shutdown() before exit. It will sync consumed offsets.
-
Qi Wang about 9 yearsHi @Dmitry , I got my problem resolved following this replies. I also have one query. Is it possible to have fixed size (say 5) thread pool to handle messages from any number (say 100 or more) of topics?