Delay in Consumer consuming messages in Apache Kafka

12,256
  1. Try to add props.put("request.required.acks", "1") to producer configuration. By default producer doesn't wait for acks and message delivery is not guaranteed. So, if you start broker just before your test, producer may start to send messages before broker is fully initialized and first several messages may be lost.

  2. Try to add props.put("auto.offset.reset", "smallest") to consumer configuration. It is equal to --from-beginning option of kafka-console-consumer.sh. If your consumer starts later than producer and there is no offset data saved in Zookeeper, then by default it will start consuming only new messages (see Consumer configs in docs).

Share:
12,256

Related videos on Youtube

Ankita
Author by

Ankita

Aspiring Data Scientist and an enthusiast for niche Big Data Technologies!

Updated on September 15, 2022

Comments

  • Ankita
    Ankita over 1 year

    I am using Kafka 0.8.0 and trying to achieve the below mentioned scenario.

    JCA API (Acts as a producer and sends data to)-----> Consumer------> HBase

    I am sending each message to consumer as soon as I fetch the data using JCA Client. For instance, as soon as producer sends message no.1 , I want to fetch the same from consumer and 'put' in HBase. But my consumer starts fetching the messages after some random n messages . I want to put the producer and consumer in sync so that both of them start working together.

    I have used:

    1 broker

    1 single topic

    1 single producer and high level Consumer

    Can anyone suggest what do i need to do to achieve the same?

    EDITED:

    Adding some relevant code snippet.

    Consumer.java

    public class Consumer extends Thread {
        private final ConsumerConnector consumer;
        private final String topic;
        PrintWriter pw = null;
        int t = 0;
        StringDecoder kd = new StringDecoder(null);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        Map<String, List<KafkaStream<String, Signal>>> consumerMap;
        KafkaStream<String, Signal> stream;
        ConsumerIterator<String, Signal> it;
    
        public Consumer(String topic) {
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
    
            this.topic = topic;
            topicCountMap.put(topic, new Integer(1));
            consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
                    new VerifiableProperties()));
            stream = consumerMap.get(topic).get(0);
            it = stream.iterator();
    
        }
    
        private static ConsumerConfig createConsumerConfig() {
            Properties props = new Properties();
            props.put("zookeeper.connect", KafkaProperties.zkConnect);
            props.put("group.id", KafkaProperties.groupId);
            props.put("zookeeper.session.timeout.ms", "400");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            props.put("fetch.size", "1024");
    
            return new ConsumerConfig(props);
    
        }
    
        synchronized public void run() {
    
            while (it.hasNext()) {
                t = (it.next().message()).getChannelid();
                System.out.println("In Consumer received msg" + t);
            }
        }
    }
    

    producer.java

    public class Producer {
        public final kafka.javaapi.producer.Producer<String, Signal> producer;
        private final String topic;
        private final Properties props = new Properties();
    
        public Producer(String topic)
        {
            props.put("serializer.class", "org.bigdata.kafka.Serializer");
            props.put("key.serializer.class", "kafka.serializer.StringEncoder");
            props.put("metadata.broker.list", "localhost:9092");
            // Use random partitioner. Don't need the key type. Just set it to Integer.
            // The message is of type userdefined Object .
            producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
            this.topic = topic;
        }
    }
    

    KafkaProperties.java

    public interface KafkaProperties {
        final static String zkConnect = "127.0.0.1:2181";
        final static String groupId = "group1";
        final static String topic = "test00";
        final static String kafkaServerURL = "localhost";
        final static int kafkaServerPort = 9092;
        final static int kafkaProducerBufferSize = 64 * 1024;
        final static int connectionTimeOut = 100000;
        final static int reconnectInterval = 10000;
        final static String clientId = "SimpleConsumerDemoClient";
    }
    

    This is how the consumer is behaving for the first 10 messages it does not sysout that message received by consumer but from the 11th message onwards it starts functioning correctly.

         producer sending msg1
    
         producer sending msg2
    
         producer sending msg3
    
         producer sending msg4
    
         producer sending msg5
    
         producer sending msg6
    
         producer sending msg7
    
         producer sending msg8
    
         producer sending msg9
    
         producer sending msg10
    
         producer sending msg11
    
         producer sending msg12
         In Consumer received msg12
    
         producer sending msg13
         In Consumer received msg13
    
         producer sending msg14
         In Consumer received msg14
    
         producer sending msg15
         In Consumer received msg15
    
         producer sending msg16
         In Consumer received msg16
    
         producer sending msg17
         In Consumer received msg17
    
         producer sending msg18
         In Consumer received msg18
    
         producer sending msg19
         In Consumer received msg19
    
         producer sending msg20
         In Consumer received msg20
    
         producer sending msg21
         In Consumer received msg21
    

    EDITED: adding the listener function where producer is sending messages to consumer. And I am using the default producer config did not overwrite it

    public synchronized void onValueChanged(final MonitorEvent event_) {
    
    
        // Get the value from the DBR
        try {
            final DBR dbr = event_.getDBR();
    
            final String[] val = (String[]) dbr.getValue();
    
            producer1.producer.send(new KeyedMessage<String, Signal>         
                        (KafkaProperties.topic,new Signal(messageNo)));
            System.out.println("producer sending msg"+messageNo);
    
            messageNo++;
    
    
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    
  • Ankita
    Ankita about 10 years
    Thanks for suggestion.Added the props.put("request.required.acks", "1") to producer but the program is behaving in a random manner. I ran the program 5 times each time with a new topic. But it gave different results all 5 times.Twice the producer and consumer were in sync rest of the times the consumer was delayed.
  • Dmitry
    Dmitry about 10 years
    By 'delayed' you mean all messages were received, but not immediately after sending? In your original output first several messages were completely lost.
  • Ankita
    Ankita about 10 years
    Yes,actually there are two scenarios : 1) Sometimes all messages were received, but not immediately after sending. 2) The other times several messages were lost as illustrated in the output provided. But when i run this command "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicname --from-beginning" from console,I get the same number of messages in consumer as produced by producer. Why is that happening??
  • Dmitry
    Dmitry about 10 years
    Added another suggestion to the answer.
  • Dmitry
    Dmitry about 10 years
    Try to call consumer.shutdown() before exit. It will sync consumed offsets.
  • Qi Wang
    Qi Wang about 9 years
    Hi @Dmitry , I got my problem resolved following this replies. I also have one query. Is it possible to have fixed size (say 5) thread pool to handle messages from any number (say 100 or more) of topics?