How to use foreach or foreachBatch in PySpark to write to database?

13,977

Solution 1

With the support of Jacek, I could fix my example:

def process_row(df, epoch_id):
    df2.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreachBatch(process_row).start()

You also must put the epoch_id into the function parameters. Otherwise you get errors in the spark log file that are not shown in the jupyter notebook.

Solution 2

tl;dr Replace foreach with foreachBatch.


Quoting the official documentation:

The foreach and foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

In other words, your writeStream.foreach(process_row) acts on a single row (of data) that has no write.jdbc available and hence the error.

Think of the row as a piece of data that you can save anywhere you want using any API you want.

If you really need support from Spark (and do use write.jdbc) you should actually use foreachBatch.

while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

Share:
13,977
tardis
Author by

tardis

misc

Updated on June 09, 2022

Comments

  • tardis
    tardis almost 2 years

    I want to do Spark Structured Streaming (Spark 2.4.x) from a Kafka source to a MariaDB with Python (PySpark).

    I want to use the streamed Spark dataframe and not the static nor Pandas dataframe.

    It seems that one has to use foreach or foreachBatch since there are no possible database sinks for streamed dataframes according to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks.

    Here is my try:

    from pyspark.sql import SparkSession
    import pyspark.sql.functions as F
    from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
    from pyspark.sql import DataFrameWriter
    # configuration of target db
    db_target_url = "jdbc:mysql://localhost/database"
    db_target_properties = {"user":"writer", "password":"1234"}
    # schema
    schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])
    
    # create spark session
    spark = SparkSession.builder.appName("streamer").getOrCreate()
    
    # create DataFrame representing the stream
    df = spark.readStream \
      .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "mytopic") \
      .load() \
      .selectExpr("Timestamp", "cast (value as string) as json") \
      .select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
      .selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
    df.printSchema()
    # Do some dummy processing
    df2 = df.filter("Value < 11111111111")
    print("df2: ", df2.isStreaming)
    
    def process_row(row):
        # Process row
        row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
        pass
    query = df2.writeStream.foreach(process_row).start()
    

    I get an error:

    AttributeError: write

    Why?