Spark Dataframe to Kafka

10,269

Here is an example of producing to kafka in streaming, but the batch version is almost identical

streaming from a source to kafka:

 val ds = df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("topic", "topic1")
      .start()

writing a static dataframe (not streamed from a source) to kafka

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

please keep in mind that

  1. each row will be a message.
  2. the dataframe must be a streaming dataframe. If you have a static dataframe then use the static version.

take a look at the basic documentation: https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

it sounds like you have a static dataframe, that isn't streaming from a source.

Share:
10,269

Related videos on Youtube

Guruprasad Swaminathan
Author by

Guruprasad Swaminathan

Updated on June 04, 2022

Comments

  • Guruprasad Swaminathan
    Guruprasad Swaminathan almost 2 years

    I am trying to stream the Spark Dataframe to Kafka consumer. I am unable to do , Can you please advice me.

    I am able to pick the data from Kafka producer to Spark , and I have performed some manipulation, After manipulating the data , I am interested to stream it back to Kafka (Consumer).

    • Amit
      Amit almost 6 years
      "Not able to do", what exactly problem are you facing? if possible minimum workable code should be included as a part of question.
    • Guruprasad Swaminathan
      Guruprasad Swaminathan almost 6 years
      val ds = df1.writeStream.format("kafka").option("kafka.bootstrap.serv‌​ers", "localhost:9092").option("topic", "kafka_topic_13").start() DF1 is a Spark dataframe , I want this data frame to be streamed to kafka. Getting error saying that the dataframe (df1) is not a streaming data frame/dataset.
    • OneCricketeer
      OneCricketeer almost 6 years
      You need to loop over each partition of the dataframe. For each partition, you then loop over all elements and send them through a Kafka Producer. I don't have code, but I'm sure you're not the first to ask this
    • Guruprasad Swaminathan
      Guruprasad Swaminathan almost 6 years
      yeah I m writing inside the loop, as i am not having enough space here i could not show the entire code :( , I am trying other way around, working on it..
    • wandermonk
      wandermonk almost 6 years
      @GuruprasadSwaminathan You can always edit the question and provide all the information required.
  • Guruprasad Swaminathan
    Guruprasad Swaminathan almost 6 years
    Thank you very much:) It is working now, after upgrading to 2.3.0 from 2.1.1.
  • Guruprasad Swaminathan
    Guruprasad Swaminathan almost 6 years
    I upgraded the system Also I updated my build.sbt to point out to 2.3.0 .