Kafka consumer "Failed to add leader for partitions" error running on Mesos

11,951

Solution 1

I think you need look into value of property "advertised.host.name". I have also recently faced this issue and fixed using above property.
Please make sure that you have mentioned correct IP address for each BROKER.
Let me know if doesn't works.

Solution 2

Try running the following command:

bin/kafka-topics.sh --zookeeper your.zookeeper:2181 --describe --topic your_topic

This will show which broker is the leader of each of your topic's partitions (see this link for more details: http://kafka.apache.org/documentation.html#quickstart_multibroker)

In my case, one of the brokers that was set as a leader had failed and no longer existed. A new leader should have been assigned but for some reason it wasn't.

I fixed the issue by:

  1. Stopping all producers and consumers
  2. Restarting each remaining broker

I then re-ran the describe command (from above) and could see that the failed broker was no longer listed as a leader.

I then brought up a new broker with the same id as the failed broker. Kafka took it from there and brought over all of the data from my other brokers (this requires that your topic has an adequate replication factor). Once the data was over, Kafka made the broker the partition leader.

Lastly, I re-started the producers and consumers.

Share:
11,951
dsimmie
Author by

dsimmie

C# and Java Engineer with a steadily increasing interest in Scala.

Updated on June 09, 2022

Comments

  • dsimmie
    dsimmie almost 2 years

    I am running a Kafka cluster of 6 brokers using the mesos/kafka library. I am able to add and start the brokers over 6 different machines and to post messages into the cluster using the Python SimpleProducer and the kafka-console-producer.sh script.

    However I am not able to get the consumers working properly. I am running the following consumer command:

    bin/kafka-console-consumer.sh --zookeeper 192.168.1.199:2181 --topic test --from-beginning --consumer.config config/consumer.properties --delete-consumer-offsets
    

    In the consumer.properties file I set the group.id to my.group and set the zookeeeper.connect to a number of nodes in the zookeeper ensemble. I get the following warninng messages from running this consumer:

                [2015-09-24 16:01:06,609] WARN [my.group_my_host-1443106865779-b5a3a1e1-leader-finder-thread], Failed to add l
        eader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherM
        anager$LeaderFinderThread)
        java.nio.channels.ClosedChannelException
                at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
                at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
                at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
                at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
                at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
                at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
                at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
                at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
                at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
                at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
                at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
                at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
                at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
                at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
                at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
                at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
                at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
                at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
                at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
                at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
                at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
        {'some':2}
        [2015-09-24 16:20:02,362] WARN [my.group_my_host-1443108001180-fa0c93e4-leader-finder-thread], Failed to add leader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
        java.nio.channels.ClosedChannelException
                at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
                at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
                at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
                at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
                at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
                at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
                at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
                at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
                at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
                at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
                at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
                at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
                at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
                at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
                at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
                at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
                at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
                at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
                at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
                at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
                at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
        ...
        // Lots more of this
        ...
        Consumed 1 messages
    

    I'm not sure why it is unable to add a leader, the leaders seem to be in Zookeeper already. As well as all these error messages I can only ever get one message through to the consumer. The string {'some':2} is a message I sent from the console producer.

    I found this error in the server.log of one of the Mesos slaves, not sure if it is relevant:

    [2015-09-24 17:09:41,926] ERROR Closing socket for /192.168.1.199 because of error (kafka.network.Processor)
    java.io.IOException: Broken pipe
                at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
                at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
                at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
                at sun.nio.ch.IOUtil.write(IOUtil.java:65)
                at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
                at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
                at kafka.network.MultiSend.writeTo(Transmission.scala:101)
                at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
                at kafka.network.Processor.write(SocketServer.scala:472)
                at kafka.network.Processor.run(SocketServer.scala:342)
                at java.lang.Thread.run(Thread.java:745)
    

    Any suggestions as to what might be happening with the consumer or where I might look to troubleshoot the problem?

    Zookeeper broker partition state for one of the log partitions:

    [zk: localhost:2181(CONNECTED) 166] get /brokers/topics/test/partitions/0/state
    {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
    

    OS: Ubuntu 14.0.4 Mesos: 0.23 Kafka: 2.10-0.8.2.1

    Update: doing some further testing using the kafka-console-consumer.sh the messages do seem to be getting through. The error messages are constant so you do not see all the messages in stdout. The Python KafkaConsumer fails immediately with a FailedPayloadsError.