How to write spark streaming DF to Kafka topic

55,918

Solution 1

My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).

Another option is to use an object pool as illustrated in this example:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

I however found it hard to implement when using checkpointing.

Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):

http://allegro.tech/2015/08/spark-kafka-integration.html

Solution 2

Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.

I'd suggest the following approach:

  • Use (and re-use) one KafkaProducer instance per executor process/JVM.

Here's the high-level setup for this approach:

  1. First, you must "wrap" Kafka's KafkaProducer because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use a lazy val so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry about KafkaProducer not being serializable.
  2. You "ship" the wrapped producer to each executor by using a broadcast variable.
  3. Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.

The code snippets below work with Spark Streaming as of Spark 2.0.

Step 1: Wrapping KafkaProducer

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}

Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer instance

import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
  val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
  new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "broker1:9092")
    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}

Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer instance (for each executor)

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
    }.toStream
    metadata.foreach { metadata => metadata.get() }
  }
}

Hope this helps.

Solution 3

With Spark >= 2.2

Both read and write operations are possible on Kafka using Structured Streaming API

Build stream from Kafka topic

// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
  .readStream // use `read` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic1")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String type.

val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Since dsStruc have the schema, it accepts all SQL kind operations like filter, agg, select ..etc on it.

Write stream to Kafka topic

dsStruc
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

More configuration for Kafka integration to read or write

Key artifacts to add in the application

 "org.apache.spark" % "spark-core_2.11" % 2.2.0,
 "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
 "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,

Solution 4

There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.

The Writer can be found here: https://github.com/cloudera/spark-kafka-writer

Solution 5

I was having the same issue and found this post.

The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.

    val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

He uses a wrapper that lazily creates the producer:

    class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new     ProducerRecord(topic, value))
    }


    object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
        val f = () => {
          val producer = new KafkaProducer[String, String](config)

          sys.addShutdownHook {
            producer.close()
          }

          producer
        }
        new KafkaSink(f)
      }
    }

The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:

    dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
        kafkaSink.value.send("topicName", message)
      }
    }
Share:
55,918
Chobeat
Author by

Chobeat

Passionated Scala coder but currently working with Python. I'm focused on Machine Learning, Big Data and Fast Data applications. I'm Italian but living in Germany right now.

Updated on September 11, 2020

Comments

  • Chobeat
    Chobeat over 3 years

    I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:

    input.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>
        partition.foreach {
          case x: String => {
            val props = new HashMap[String, Object]()
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
    
            println(x)
            val producer = new KafkaProducer[String, String](props)
            val message = new ProducerRecord[String, String]("output", null, x)
            producer.send(message)
          }
        }
      )
    )
    

    and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.

    I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?

  • Mekal
    Mekal over 7 years
    404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer
  • miguno
    miguno over 7 years
    What was the issue you ran into with regards to checkpointing?
  • miguno
    miguno over 7 years
    Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)
  • CᴴᴀZ
    CᴴᴀZ about 7 years
    foreachPartition will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?
  • avocado
    avocado over 6 years
    If I may ask, how to implement this idea in Python, especially the lazy part?
  • Reaper
    Reaper about 6 years
    Please include the content of the link(s) so that when they break your answer still has value.
  • Ra41P
    Ra41P almost 6 years
    What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).
  • Ram Ghadiyaram
    Ram Ghadiyaram almost 5 years
    new KafkaProducer can be broadcasted and re-used foreach partition.... since it asynchronously buffers its an optimized way to do that. see this stackoverflow.com/a/39539234/647053 step-2
  • Ram Ghadiyaram
    Ram Ghadiyaram almost 5 years
    just thought of asking, how many producer instances are used when you are doing structred streaming way. will the producer instance will be broadcasted ? like step2 of this
  • alex
    alex almost 5 years
    in Step 3: what is the use of the metadata: Stream[Future[RecordMetadata]]. I don't think I see it being used anywhere. And what could it be used for?
  • miguno
    miguno almost 5 years
    The metadata is literally used in the next statement, which ensures that the next partition is only processed (via foreachPartition) once all futures for the current partition have completed (cf. metadata.get()).
  • alex
    alex almost 5 years
    Thanks for the response! I'm not sure though what is the issue if we don't wait for all futures for a partition to complete processing? Why even use a future if we need to wait for each partition to process first before going to the next?
  • sainath reddy
    sainath reddy about 4 years
    @BdEngineer I think you just need to understand DataFrames better. There's an answer already that mentions this. If you can try to understand the context of what a Dataframe is and read for each partition etc's api docs you will make a connection.
  • Srinath Thota
    Srinath Thota over 3 years
    Hi, Any reason to Broadcast the Lazy Object. Can't I create MySparkKafkaProducer object per worker node when Spark Cluster initialized??