How to use foreach or foreachBatch in PySpark to write to database?
Solution 1
With the support of Jacek, I could fix my example:
def process_row(df, epoch_id):
df2.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
pass
query = df2.writeStream.foreachBatch(process_row).start()
You also must put the epoch_id into the function parameters. Otherwise you get errors in the spark log file that are not shown in the jupyter notebook.
Solution 2
tl;dr Replace foreach
with foreachBatch
.
Quoting the official documentation:
The
foreach
andforeachBatch
operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - whileforeach
allows custom write logic on every row,foreachBatch
allows arbitrary operations and custom logic on the output of each micro-batch.
In other words, your writeStream.foreach(process_row)
acts on a single row (of data) that has no write.jdbc
available and hence the error.
Think of the row as a piece of data that you can save anywhere you want using any API you want.
If you really need support from Spark (and do use write.jdbc
) you should actually use foreachBatch
.
while
foreach
allows custom write logic on every row,foreachBatch
allows arbitrary operations and custom logic on the output of each micro-batch.
Comments
-
tardis almost 2 years
I want to do Spark Structured Streaming (Spark 2.4.x) from a Kafka source to a MariaDB with Python (PySpark).
I want to use the streamed Spark dataframe and not the static nor Pandas dataframe.
It seems that one has to use
foreach
orforeachBatch
since there are no possible database sinks for streamed dataframes according to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks.Here is my try:
from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType from pyspark.sql import DataFrameWriter # configuration of target db db_target_url = "jdbc:mysql://localhost/database" db_target_properties = {"user":"writer", "password":"1234"} # schema schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())]) # create spark session spark = SparkSession.builder.appName("streamer").getOrCreate() # create DataFrame representing the stream df = spark.readStream \ .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "mytopic") \ .load() \ .selectExpr("Timestamp", "cast (value as string) as json") \ .select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \ .selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value") df.printSchema() # Do some dummy processing df2 = df.filter("Value < 11111111111") print("df2: ", df2.isStreaming) def process_row(row): # Process row row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties) pass query = df2.writeStream.foreach(process_row).start()
I get an error:
AttributeError: write
Why?