Simple Kafka Consumer not receiving messages

36,627

Solution 1

First check what all the groups are available by using :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Then check for which group your topic belongs by using below cmd :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe

Once you find your topic and associated group name (just replace group.id with your group if it not belongs to default group) then try with below prop and let me know if it works :

  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "test-consumer-group"); // default topic name
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

  //Kafka Consumer subscribes list of topics here.
  consumer.subscribe(Arrays.asList(topicName));  // replace you topic name

  //print the topic name

  java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics();
  System.out.println("list of topic size :" + listTopics.size());

  for(String topic : listTopics.keySet()){
      System.out.println("topic name :"+topic);
  }

Solution 2

Run the consumer before running the producer so that the consumer registers with the group coordinator first.Later when u run the producer the consumer consumes the messages.The first time u run the consumer its registering with the group coordinator.In order to find out till what offset the consumer has consumed the messages use this kafka-consumer-offset-checker.bat --group group-1 --topic testing-1 --zookeeper localhost:2181 This shows the consumer has consumed which offset of the topic last.

Share:
36,627
mohiitg
Author by

mohiitg

Updated on July 09, 2022

Comments

  • mohiitg
    mohiitg almost 2 years

    I am a newbie to Kafka and running a simple kafka consumer/producer example as given on KafkaConsumer and KafkaProducer. When I am running consumer from terminal, consumer is receiving messages but I am not able to listen using Java code. I have searched for similar issues on StackoverFlow also (Links: Link1, Link2) and tried that solutions but nothing seems to be working for me. Kafka Version: kafka_2.10-0.10.2.1 and corresponding maven dependency is used in pom.

    Java Code for producer and consumer:

    public class SimpleProducer {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9094");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>("topic3", Integer.toString(i), Integer.toString(i)));
    
        producer.close();
    
    }}
    
    public class SimpleConsumer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9094");
        props.put("group.id", "test");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic3", "topic2"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }}
    

    Starting kafka: bin/kafka-server-start.sh config/server.properties (I have already set port, brokerid in properties file)