How do I implement Kafka Consumer in Scala

17,585

Solution 1

The reason you're seeing most of the examples in Java is that the new KafkaProducer starting 0.8.2.2 is written in Java.

Assuming you're using sbt as your build system, and assuming your working with Kafka 0.8.2.2 (you can change the version as needed), you'll need:

libraryDependencies ++= {
  Seq(
    "org.apache.kafka" %% "kafka" % "0.8.2.2",
    "org.apache.kafka" % "kafka-clients" % "0.8.2.2",
  )
}

A simple example should get you started:

import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer 

object KafkaExample {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "localhost:9092")
    properties.put("group.id", "consumer-tutorial")
    properties.put("key.deserializer", classOf[StringDeserializer])
    properties.put("value.deserializer", classOf[StringDeserializer])

    val kafkaConsumer = new KafkaConsumer[String, String](properties)
    kafkaConsumer.subscribe("firstTopic", "secondTopic")

    while (true) {
      val results = kafkaConsumer.poll(2000).asScala
      for ((topic, data) <- results) {
        // Do stuff
      }
    }
}

Solution 2

You can also look to a working template totally build on Scala here: https://github.com/knoldus/activator-kafka-scala-producer-consumer This application contains the code that you want to use in here.

I hope I solved your problem thanks !

Share:
17,585
annedroiid
Author by

annedroiid

I am a primarily Scala developer who has been dabbling in become more full-stack.

Updated on June 22, 2022

Comments

  • annedroiid
    annedroiid almost 2 years

    I'm trying to implement a kafka consumer in scala. I've seen a million tutorials for how to do it in Java, and even some (like this one) that say it's for scala but it's written in Java.

    Does anyone know where I can find an example of how to write it in Scala? I've only just started to learn Scala so maybe the linked example can be used in Scala even though it's written in Java or something, but I honestly have no idea what I'm doing at the moment. Everything I google just links me to how to do it in Java.

  • Avihoo Mamka
    Avihoo Mamka almost 8 years
    Isn't the consumer should talk to the zookeeper clients rather than the brokers?
  • Yuval Itzchakov
    Yuval Itzchakov almost 8 years
    @AvihooMamka Kafka no longer "requires" ZooKeeper to track offsets. It's entirely up to you how you do that. And in general, a consumer talks to the brokers for consumption.
  • ItayB
    ItayB over 6 years
    results are always null. What do I missed?
  • Yuval Itzchakov
    Yuval Itzchakov over 6 years
    @ItayB Are you sure you're consuming from the right topic? Does it have messages? What is your consuming strategy? earliest or latest?
  • ItayB
    ItayB over 6 years
    Yes, I'm consuming from the right topic. I run console consumer which consume messages. I tried both strategies and both offsets. My suspicious is 'bootstrap.servers' which is not necessary in 0.8.2.2 (from the docs) but is mandatory in this specific client..
  • ItayB
    ItayB about 6 years
    @YuvalItzchakov something strange. I tried to debug this thing. I'm using kafka 0.8.2.2 and the same libraries as you mentioned above. The poll doesn't have have implementation, it's only a stub that returns null - and that's why it doesn't work. In 0.9.0.0 it does have implementation but my version is 0.8.2.2. I'll have to find different API to consume messages
  • Yuval Itzchakov
    Yuval Itzchakov about 6 years
    @ItayB You need this, see the implementation there. I'll update my answer.
  • ItayB
    ItayB about 6 years
    @YuvalItzchakov Thanks! please look at line 553 in your link. It's an empty stub. that's why I get null all the time. Most of the implementations there use poll. Can you please refer me to the relevant lines? waiting for your update :-)