How to create a kafka consumer group in Golang?

12,812

Solution 1

There is no need to use sarama-cluster library. It is DEPRECATED for apache kafka integration. Sarama original library itself provide a way to connect to kafka cluster using consumer group.

We need to create client and then we initialize consumer group where we create claims and wait for message channel to receive message.

Initializing client :-

kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion is the version of kafka server like 0.11.0.2
if err != nil {
    log.Println(err)
}

config := sarama.NewConfig()
config.Version = kfversion
config.Consumer.Return.Errors = true

// Start with a client
client, err := sarama.NewClient([]string{brokerAddr}, config)
if err != nil {
    log.Println(err)
}
defer func() { _ = client.Close() }()

Connection to consumer group :-

// Start a new consumer group
group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
if err != nil {
    log.Println(err)
}
defer func() { _ = group.Close() }()

Start consuming messages from topic partition :-

// Iterate over consumer sessions.
ctx := context.Background()
for {
    topics := []string{topicName}
    handler := &Message{}
    err := group.Consume(ctx, topics, handler)
    if err != nil {
        log.Println(err)
    }
}

The last part is to wait for message channel to consume messages. We need to implement all of the functions (three) to implement ConsumerGroupHandler interface.

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
        sess.MarkMessage(msg, "")
    }
    return nil
}

For more information on kafka using golang check sarama library.

Solution 2

The consumer group is specified by the second argument of the cluster consumer "constructor". Here's a very basic sketch:

import (
    "github.com/Shopify/sarama"
    "github.com/bsm/sarama-cluster"
)

conf := cluster.NewConfig()
// add config values

brokers := []string{"kafka-1:9092", "kafka-2:9092"}
group := "Your-Consumer-Group"
topics := []string{"topicName"}
consumer := cluster.NewConsumer(broker, group, topics, conf)

And so you'll have a consumer belonging to the specified consumer group.

Share:
12,812
Lorenzo Belli
Author by

Lorenzo Belli

Your code is correct: it does exactly what you told it to do.

Updated on July 06, 2022

Comments

  • Lorenzo Belli
    Lorenzo Belli almost 2 years

    An available library is sarama (or its expansion sarama-cluster) however no consumer group example are provided, not in sarama nor in sarama-cluster.

    I do not understand the API. May I have an example of creating a consumer group for a topic?

  • vinni_f
    vinni_f over 4 years
    unless you are stuck at kafka 0.10.1, you still need to use sarama-cluster
  • Li Ziyan
    Li Ziyan about 3 years
    could I scale safely if I deploy this program in Pod of Kubernetes Deployment?
  • Himanshu
    Himanshu about 3 years
    yes @LiZiyan you can scale producers to multiple pods in cluster