I'm getting "Topic not present in metadata after 60000 ms" message on some computers

13,647
192.168.10.10:9092

This seems to be an internal IP. Check if the clients where you cannot access are within its network range i.e. whether they can access this IP.

Try doing a telnet from your client machine..

telnet 192.168.10.10 9092

If you are not able to telnet then give the IP which can be accessed by your clients and ensure the same in the advertised.listeners also.

Also check your advertised.listeners config. When we connect to a url given in the bootstrap.servers that typically should par with those in the advertised.listeners configuration.

Topic metadata not present means that your client is unable to fetch any information about the given topic i.e. it cannot get metadata through the given bootstrap.servers property.

Share:
13,647

Related videos on Youtube

Shimon
Author by

Shimon

Updated on June 04, 2022

Comments

  • Shimon
    Shimon almost 2 years

    Here's my program

    package kafkaConsumer;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.LongSerializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.common.serialization.LongDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.lang.management.ManagementFactory;
    import java.lang.management.RuntimeMXBean;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.List;
    import java.util.Properties;
    import java.util.Scanner;
    import java.util.concurrent.Future;
    
    public class KafkaConsumerExample {
        private final static String INTOPIC = "my-intopic";
        private final static String OUTTOPIC = "my-outtopic";
        private final static String BOOTSTRAP_SERVERS = "192.168.10.10:9092";
    
        private static Producer<Long, String> createProducer(String bootstrapServers) {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerExample");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            return new KafkaProducer<>(props);
        }
    
        private static Consumer<Long, String> createConsumer(String intopic, String bootstrapServers) {
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerExample");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
            // Create the consumer using props.
            final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
            // Subscribe to the topic.
            consumer.subscribe(Collections.singletonList(intopic));
            return consumer;
        }
    
        static boolean run(
                Consumer<Long, String> consumer, Producer<Long, String> producer,
                String inTopic, String outTopic) throws InterruptedException {
    
            String valueToSend;
            long keyToUse;
    
            if (consumer == null) {
                Scanner sc = new Scanner(System.in);
                System.out.print("Enter key> ");
                keyToUse = sc.nextLong();
                valueToSend = sc.nextLine();
                System.out.print("Enter value> ");
                valueToSend = sc.nextLine();
            } else {
                Duration delta = Duration.ofSeconds(1);
                ConsumerRecords<Long, String> consumerRecords = consumer.poll(delta);
                while (consumerRecords.count() == 0) {
                    consumerRecords = consumer.poll(delta);
                }
                ConsumerRecord<Long, String> record = consumerRecords.iterator().next();
                keyToUse = record.key();
                valueToSend = record.value();
                if (producer != null)
                    System.out.println("Got key = " + keyToUse + " and value = " + valueToSend);
            }
    
            if (producer == null) {
                System.out.println("key = " + keyToUse + " and value = " + valueToSend);
            } else {
                try {
                    System.out.println("Creating ProducerRecord");
                    final ProducerRecord<Long, String> record =
                            new ProducerRecord<>(outTopic, keyToUse, valueToSend);
                    System.out.println("Calling producer.send");
                    Future<RecordMetadata> sent = producer.send(record);
                    System.out.println("Calling sent.get");
                    RecordMetadata metadata = sent.get();
                    System.out.println("Calling flush");
                    producer.flush();
                    System.out.println("After flush");
                } catch (Exception e) {
                    System.out.println("Exception sending message: " + e.getMessage());
                }
            }
            return !valueToSend.equals("STOP");
        }
    
        public static void usage() {
            System.out.println(System.getProperty("sun.java.command"));
            System.out.println();
            System.out.println("Usage parameters: [--intopic name] [--outtopic name] [--bootstrap-servers servers]");
            System.exit(1);
        }
    
        public static void main(String... args) throws Exception {
            String inTopic = INTOPIC;
            String outTopic = OUTTOPIC;
            String bootstrapServers = BOOTSTRAP_SERVERS;
    
            for (int i = 0; i < args.length; ++i) {
                if (args[i].equals("--intopic")) {
                    if (i == args.length - 1) {
                        usage();
                    }
                    inTopic = args[++i];
                } else if (args[i].equals("--outtopic")) {
                    if (i == args.length - 1) {
                        usage();
                    }
                    outTopic = args[++i];
                } else if (args[i].equals("--bootstrap-servers")) {
                    if (i == args.length - 1) {
                        usage();
                    }
                    bootstrapServers = args[++i];
                } else {
                    usage();
                }
            }
    
            final Consumer<Long, String> consumer;
            if (inTopic.equals("stdin")) {
                consumer = null;
            } else {
                consumer = createConsumer(inTopic, bootstrapServers);
            }
            final Producer<Long, String> producer;
            if (outTopic.equals("stdout")) {
                producer = null;
            } else {
                producer = createProducer(bootstrapServers);
            }
    
            while (true) {
                if (!run(consumer, producer, inTopic, outTopic)) {
                    break;
                }
            }
            if (consumer != null)
                consumer.close();
            if (producer != null)
                producer.close();
        }
    }
    

    I run it on Windows and Linux. On some computers it runs fine, but on other computers, specifically a Linux machine which is not the kafka machine, it consistently gives me this error:

    Exception sending message: org.apache.kafka.common.errors.TimeoutException: Topic outtopic not present in metadata after 60000 ms.
    

    This happens, of course, when trying to send a message in the run() function, specifically in the sentence RecordMetadata metadata = sent.get().

    This kafka installation allows creation of new topics automatically. In fact, if I enter a new name in the --outtopic parameter, even when sending the message fails, the topic is created.

    Any clues why? What am I missing in the configuration?

    Thank you

    Shimon

    • Shimon
      Shimon almost 4 years
      No, it's not in a docker container. It's a set of computers.
  • Shimon
    Shimon almost 4 years
    Sorry, I changed the IP address before posting this. But it is in fact an internal IP address that I'm using. Telnet works from the computer to the kafka host. Also, it's not that the communication fails. As I said, if I use a new topic, the topic gets created; it's just the sending of the message that fails with this message.
  • JavaTechnical
    JavaTechnical almost 4 years
    @Shimon Did you check your advertised.listeners config? The IP you specify in that config should be accessible from your client machine and should also point to the IP of the machine where Kafka is running
  • Shimon
    Shimon almost 4 years
    Sorry, I don't know how to set advertised.listeners. Where should I define it on the client machine?
  • JavaTechnical
    JavaTechnical almost 4 years
    @Shimon It is a broker config. You need to edit the server.properties file which you give while starting the kafka broker
  • Shimon
    Shimon almost 4 years
    Added advertised.listeners to server.properties and restarted kafka service. Still not working. Note that when I run the same program from my laptop (Windows) it does run successfully (and it also ran successfully without the advertised.listeners configuration).
  • Shimon
    Shimon almost 4 years
    And I made one mistake above. When I run the program, the topic does not get created as I thought before.
  • JavaTechnical
    JavaTechnical almost 4 years
    @Shimon how did you add it? it is typically PLAINTEXT://<IP>:<PORT>. This IP:Port is what you need to give in the bootstrap.servers
  • Shimon
    Shimon almost 4 years
    Thank you very much @JavaTechnical. I found that I needed both advertised.listeners config and also, shame on me, to stop the firewalld daemon:). I will configure the firewall later, but for now the advertised.listeners did the trick!. Thanks from the bottom of my heart.