Spark Dataframe to Kafka
10,269
Here is an example of producing to kafka in streaming, but the batch version is almost identical
streaming from a source to kafka:
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
writing a static dataframe (not streamed from a source) to kafka
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
please keep in mind that
- each row will be a message.
- the dataframe must be a streaming dataframe. If you have a static dataframe then use the static version.
take a look at the basic documentation: https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
it sounds like you have a static dataframe, that isn't streaming from a source.
Related videos on Youtube
Author by
Guruprasad Swaminathan
Updated on June 04, 2022Comments
-
Guruprasad Swaminathan almost 2 years
I am trying to stream the Spark Dataframe to Kafka consumer. I am unable to do , Can you please advice me.
I am able to pick the data from Kafka producer to Spark , and I have performed some manipulation, After manipulating the data , I am interested to stream it back to Kafka (Consumer).
-
Amit almost 6 years"Not able to do", what exactly problem are you facing? if possible minimum workable code should be included as a part of question.
-
Guruprasad Swaminathan almost 6 yearsval ds = df1.writeStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "kafka_topic_13").start() DF1 is a Spark dataframe , I want this data frame to be streamed to kafka. Getting error saying that the dataframe (df1) is not a streaming data frame/dataset.
-
OneCricketeer almost 6 yearsYou need to loop over each partition of the dataframe. For each partition, you then loop over all elements and send them through a Kafka Producer. I don't have code, but I'm sure you're not the first to ask this
-
Guruprasad Swaminathan almost 6 yearsyeah I m writing inside the loop, as i am not having enough space here i could not show the entire code :( , I am trying other way around, working on it..
-
wandermonk almost 6 years@GuruprasadSwaminathan You can always edit the question and provide all the information required.
-
-
Guruprasad Swaminathan almost 6 yearsThank you very much:) It is working now, after upgrading to 2.3.0 from 2.1.1.
-
Guruprasad Swaminathan almost 6 yearsI upgraded the system Also I updated my build.sbt to point out to 2.3.0 .