Kafka 0.10 Java Client TimeoutException: Batch containing 1 record(s) expired

10,154

Solution 1

I encounter the same problems.

You should change your kafka server.properties to specify ip address. eg:

PLAINTEXT://YOUIP:9093

if not, kafka will use hostname, if the producer can not get the host, it can not send message to kafka even if you can telnet them.

Solution 2

Port information in your BOOTSTRAP_SERVERS_CONFIG configuration is incorrect (MYIP:9092).

As you've mentioned in server.properties as "PLAINTEXT://:9093, PLAINTEXT://:9093, PLAINTEXT://:9094".

Share:
10,154
Armen
Author by

Armen

Updated on June 04, 2022

Comments

  • Armen
    Armen almost 2 years

    I have a single node, multi (3) broker Zookeeper / Kafka setup. I am using the Kafka 0.10 Java client.

    I wrote following simple remote (on a different Server than Kafka) Producer (in the code I replaced my public IP address with MYIP):

    Properties config = new Properties();
    try {
        config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        producer = new KafkaProducer<String, byte[]>(config);
        Schema.Parser parser = new Schema.Parser();
        schema = parser.parse(GATEWAY_SCHEMA);
        recordInjection = GenericAvroCodecs.toBinary(schema);
        GenericData.Record avroRecord = new GenericData.Record(schema);
        //Filling in avroRecord (code not here)
        byte[] bytes = recordInjection.apply(avroRecord);
    
        Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes));
        RecordMetadata data = future.get();
    } catch (Exception e) {
        e.printStackTrace();
    }
    

    My server properties for the 3 brokers look like this (in the 3 different server properties files broker.id is 0, 1, 2 and listeners is PLAINTEXT://:9092, PLAINTEXT://:9093, PLAINTEXT://:9094 and host.name is 10.2.0.4, 10.2.0.5, 10.2.0.6). This is the first server properties file:

    broker.id=0
    listeners=PLAINTEXT://:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka1-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=6000
    

    When I execute the code, I get following exception:

    java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212)
        at com.nr.roles.gateway.gw.service(gw.java:126)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158)
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
        at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119)
        at org.eclipse.jetty.server.Server.handle(Server.java:517)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
        at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75)
        at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
        at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
        at java.lang.Thread.run(Thread.java:745)
     Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0
    

    Does anyone know what I am missing? Any help would be appreciated. Thanks a lot

  • Armen
    Armen almost 8 years
    sorry, i had a typo here, in server.properties it is "PLAINTEXT://:9092, PLAINTEXT://:9093, PLAINTEXT://:9094". So the BOOTSTRAP_SERVERS_CONFIG ports are correct.
  • jack AKA karthik
    jack AKA karthik about 7 years
    i have my property set right in my server.properties as, listeners=PLAINTEXT://domain_name:9092,But still getting this exception org.apache.kafka.common.errors.TimeoutException: Batch Expired java.util.concurrent.ExecutionException, while connecting from external server, i have increased request.timeout.ms to a higher values too.