Kafka on Kubernetes multi-node

10,965

Solution 1

My solution for this has been to use the IP as the ID: trim the dots and you get a unique ID that is also available outside of the container to other containers.

With a Service you can get access to the multiple containers's IPs (see my answer here on how to do this: what's the best way to let kubenetes pods communicate with each other?

so you can get their IDs too if you use IPs as the unique ID. The only issue is that IDs are not continuous or start at 0, but zookeeper / kafka don't seem to mind.

EDIT 1:

The follow up concerns configuring Zookeeper:

Each ZK node needs to know of the other nodes. The Kubernetes discovery service knowns of nodes that are within a Service so the idea is to start a Service with the ZK nodes.

This Service needs to be started BEFORE creating the ReplicationController (RC) of the Zookeeper pods.

The start-up script of the ZK container will then need to:

  • wait for the discovery service to populate the ZK Service with its nodes (that takes a few seconds, for now I just add a sleep 10 at the beginning of my startup script but more reliably you should look for the service to have at least 3 nodes in it.)
  • look up the containers forming the Service in the discovery service: this is done by querying the API. the KUBERNETES_SERVICE_HOST environment variable is available in each container. The endpoint to find service description is then

URL="http(s)://$USERNAME:$PASSWORD@${KUBERNETES_SERVICE_HOST/api/v1/namespaces/${NAMESPACE}/endpoints/${SERVICE_NAME}"

where NAMESPACE is default unless you changed it, and SERVICE_NAME would be zookeeper if you named your service zookeeper.

there you get the description of the containers forming the Service, with their ip in a "ip" field. You can do:

curl -s $URL | grep '\"ip\"' | awk '{print $2}' | awk -F\" '{print $2}' 

to get the list of IPs in the Service. With that, populate the zoo.cfg on the node using the ID defined above

You might need the USERNAME and PASSWORD to reach the endpoint on services like google container engine. These need to be put in a Secret volume (see doc here: http://kubernetes.io/v1.0/docs/user-guide/secrets.html )

You would also need to use curl -s --insecure on Google Container Engine unless you go through the trouble of adding the CA cert to your pods

Basically add the volume to the container, and look up the values from file. (contrary to what the doc says, DO NOT put the \n at the end of the username or password when base64 encoding: it just make your life more complicated when reading those)

EDIT 2:

Another thing you'll need to do on the Kafka nodes is get the IP and hostnames, and put them in the /etc/hosts file. Kafka seems to need to know the nodes by hostnames, and these are not set within service nodes by default

EDIT 3:

After much trial and thoughts using IP as an ID may not be so great: it depends on how you configure storage. for any kind of distributed service like zookeeper, kafka, mongo, hdfs, you might want to use the emptyDir type of storage, so it is just on that node (mounting a remote storage kind of defeats the purpose of distributing these services!) emptyDir will relaod with the data on the same node, so it seems more logical to use the NODE ID (node IP) as the ID, because then a pod that restarts on the same node will have the data. That avoid potential corruption of the data (if a new node starts writing in the same dir that is not actually empty, who knows what can happen) and also with Kafka, the topics being assigned a broker.id, if the broker id changes, zookeeper does not update the topic broker.id and the topic looks like it is available BUT points to the wrong broker.id and it's a mess.

So far I have yet to find how to get the node IP though, but I think it's possible to lookup in the API by looking up the service pods names and then the node they are deployed on.

EDIT 4

To get the node IP, you can get the pod hostname == name from the endpoints API /api/v1/namespaces/default/endpoints/ as explained above. then you can get the node IP from the pod name with /api/v1/namespaces/default/pods/

PS: this is inspired by the example in the Kubernetes repo (example for rethinkdb here: https://github.com/kubernetes/kubernetes/tree/master/examples/rethinkdb

Solution 2

This shows up prominently in my searches but contains pretty outdated information. To update this with a more modern solution, you should use a StatefulSet deployment, which will generate pods that have an integer counter instead of a hash in their name, eg. kafka-controller-0.

This is of course the hostname, so from there it's a simple matter to extract a fixed, invariant broker ID using awk:

hostname | awk -F'-' '{print $3}'

The most popular containers available for Kafka these days have a broker ID command.

Solution 3

Look at https://github.com/CloudTrackInc/kubernetes-kafka It allows to start Kafka in kubernetes and support scaling it and auto extanding.

Solution 4

I did this using docker-compose (The difference for Kubernetes would be that you would pass the ID via your service.yaml and have 2 services):

kafka1:
  build: kafka-0.8.1/
  ports:
  - 9092
  links:
  - zookeeper
  environment:
  - ID=1
kafka2:
  build: kafka-0.8.1/
  ports:
  - 9092
  links:
  - zookeeper
  environment:
  - ID=2

Config:

broker.id=${ID}
port=9092
advertised.host.name=${HOST}
advertised.port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-${ID}
num.partitions=200
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=${DOCKER_ZOOKEEPER_1_PORT_2181_TCP_ADDR}:${DOCKER_ZOOKEEPER_1_PORT_2181_TCP_PORT}
zookeeper.connection.timeout.ms=6000

sh:

#!/bin/bash
echo "Running config"
export HOST=`grep $HOSTNAME /etc/hosts | awk '{print $1}'`
export ID=${ID:?}
perl -p -i -e 's/\$\{([^}]+)\}/defined $ENV{$1} ? $ENV{$1} : $&/eg' < /broker.template > $KAFKA_HOME/config/server.properties
echo "Done"
echo "starting kafka with:"
echo "$KAFKA_HOME/config/server.properties"
echo ""
cat $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
Share:
10,965
NegatioN
Author by

NegatioN

My interests are machine learning, especially computer vision and plain ol' programming. Find me @ www.jrishaug.com

Updated on June 06, 2022

Comments

  • NegatioN
    NegatioN almost 2 years

    So my objective here is to set up a cluster of several kafka-brokers in a distributed fashion. But I can't see the way to make the brokers aware of each other.

    As far as i understand, every broker needs a separate ID in their config, which I cannot guarantee or configure if I launch the containers from kubernetes?

    They also need to have the same advertised_host?

    Are there any parameters I'm missing that would need to be changed for the nodes to discover each other?

    Would it be viable to do such a configuration at the end of the Dockerfile with a script? And/or a shared volume?

    I'm currently trying to do this with the spotify/kafka-image which has a preconfigured zookeeper+kafka combination, on vanilla Kubernetes.

  • NegatioN
    NegatioN over 8 years
    Are you sure it's possible to pass environment-variables through a kubernetes-service? because other than that I can see this as a solution.
  • NegatioN
    NegatioN over 8 years
    This talks about passing environment-variables in Pods or ReplicationControllers? I am aware this is possible, but does this also imply that it can be done in a Service?
  • NegatioN
    NegatioN over 8 years
    Good idea! This seems like a doable solution! I managed to give all my brokers IDs now, through a startup-script doing: BROKER_ID=$(ip addr | awk '/inet/ && /eth0/{sub(/\/.*$/,"",$2); print $2}' | sed -r 's/\.//g') and : sed -r -i "s/(broker.id)=(.*)/\1=$BROKER_ID/g" $KAFKA_HOME/config/server.properties
  • MrE
    MrE over 8 years
    I used ip=$(hostname -i) Then id=${ip//./}
  • NegatioN
    NegatioN over 8 years
    Can I also ask how you add the servers to /conf/zoo.cfg? A shared volume that's interacts with the Kafka-Service of Kubernetes?
  • MrE
    MrE over 8 years
    So the idea is this, and i will edit the answer with more details: start a Service with the zk nodes, and a service with the kafka nodes. then start the replication controller for zookeeper. at that point the service will register the zk containers... but it takes a few seconds. so meanwhile your container needs to wait for the discovery service to populate the zk container nodes. I put a 10sec pause in my script starting zookeeper to account for that, but it it not ideal. then look up the IPs and populate the zoo.cfg before running zk.
  • NegatioN
    NegatioN over 8 years
    Thank you for the thorough walkthrough! Which Username and Password are you referring to? The API-server itself I assume? I auth to the server with a cert atm. I should lookup how to curl with a cert then?
  • MrE
    MrE over 8 years
    Yes username and password for the API server. that can be found by describing the cluster in gke: gcloud beta container clusters describe <yourclustername> If you got that working with a certificate, I'd be interested to know how you set that up.
  • NegatioN
    NegatioN over 8 years
    Kelsey Hightower has written a Intro to Kubernetes where installing certs is covered in the sub-topics "PKI infrastructure" and "Provision the Controller Node". Later those certs are added to Kubectl The guide is focused on CoreOS, but I see no reason it shouldn't work on other distros. :) Thanks again for the help. It's not up and running yet, but with this guide I have only myself to blame!
  • OneCricketeer
    OneCricketeer about 5 years
    That link is fairly outdated. Probably better to find Strimzi or Confluent Helm Charts now.
  • Amit Yadav
    Amit Yadav over 4 years
    This fails miserably when I am trying to deploy Kafka on multiple kubernetes clusters. I have 3 GKE clusters in different regions and trying to keep one Kafka broker per cluster for High Availability. But because Statefulsets start with 0 on every cluster (e.g statefulset-name-0) all my brokers are getting the ID 0 and only one of them is able to connect with the Zookeeper (running separately).
  • MrE
    MrE over 4 years
    Nowadays, use StatefullSets to handle statefull applications like Kafka. This post was a work around when these didn't exists in Kubernetes