Does Apache Spark SQL support MERGE clause?

10,230

Solution 1

Spark does support MERGE operation using Delta Lake as storage format. The first thing to do is to save the table using the delta format to provide support for transactional capabilities and support for DELETE/UPDATE/MERGE operations with spark

Python/scala: df.write.format("delta").save("/data/events")

SQL: CREATE TABLE events (eventId long, ...) USING delta

Once the table exists, you can run your usual SQL Merge command:

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

The command is also available in Python/Scala:

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()

To support Delta Lake format, you also need the delta package as dependency in your spark job:

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-core_x.xx</artifactId>
  <version>xxxx</version>
</dependency>

See https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge for more details

Solution 2

As of Spark 3.0, Spark offers a very clean way of doing the merge operation using the spark delta table. https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge

Solution 3

you can write your custom code: Below code you can edit to go with merge instead of Insert. Make sure this is computation heavy operations. but get y

  df.rdd.coalesce(2).foreachPartition(partition => {
  val connectionProperties = brConnect.value
  val jdbcUrl = connectionProperties.getProperty("jdbcurl")
  val user = connectionProperties.getProperty("user")
  val password = connectionProperties.getProperty("password")
  val driver = connectionProperties.getProperty("Driver")
  Class.forName(driver)

  val dbc: Connection = DriverManager.getConnection(jdbcUrl, user, password)
  val db_batchsize = 1000
  var pstmt: PreparedStatement = null

  partition.grouped(db_batchsize).foreach(batch => {
    batch.foreach{ row =>
      {
        val id = row.id
        val fname = row.fname
        val lname = row.lname
        val userid = row.userid
        println(id, fname)
        val sqlString = "INSERT employee USING   " +
        " values (?, ?, ?, ?) "

        var pstmt: PreparedStatement = dbc.prepareStatement(sqlString)
        pstmt.setLong(1, row.id)
        pstmt.setString(2, row.fname)
        pstmt.setString(3, row.lname)
        pstmt.setString(4, row.userid)
        pstmt.addBatch()
        pstmt.executeBatch()
      }

    }
    //pstmt.executeBatch()
    dbc.commit()
    pstmt.close()
  })
  dbc.close()
} )
Share:
10,230
DilTeam
Author by

DilTeam

Big Data Consultant. More info at: http://www.dilteam.com

Updated on June 27, 2022

Comments

  • DilTeam
    DilTeam almost 2 years

    Does Apache Spark SQL support MERGE clause that's similar to Oracle's MERGE SQL clause?

    MERGE into <table> using (
      select * from <table1>
        when matched then update...
           DELETE WHERE...
        when not matched then insert...
    )
    
  • Das_Geek
    Das_Geek over 4 years
    A link to a solution is welcome, but please ensure your answer is useful without it: add context around the link so your fellow users will have some idea what it is and why it’s there, then quote the most relevant part of the page you're linking to in case the target page is unavailable. Answers that are little more than a link may be deleted.