Spark MergeSchema on parquet columns

13,423

Q : 1. Does this support only Parquet file format or any other file formats like csv,txt files. 2. if column orders are disturbed then whether Mergeschema will align the columns to correct order when it was created or do we need to do this manuallly by selecting all the columns


AFAIK Merge schema is supported only by parquet not by other format like csv , txt.

Mergeschema (spark.sql.parquet.mergeSchema) will align the columns in the correct order even they are distributed.

Example from spark documentation on parquet schema-merging:

import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)

UPDATE : Real example given by you in the comment box...


Q : Whether new columns - empage, empdept will be added after empid,empname,salary columns?


Answer : Yes EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column.

see the full example.

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.SaveMode


object CSVDataSourceParquetSchemaMerge extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)

  import org.apache.spark.sql.SparkSession

  val spark = SparkSession.builder().appName("CSVParquetSchemaMerge")
    .master("local")
    .getOrCreate()


  import spark.implicits._

  val csvDataday1 = spark.sparkContext.parallelize(
    """
      |empid,empname,salary
      |001,ABC,10000
    """.stripMargin.lines.toList).toDS()
  val csvDataday2 = spark.sparkContext.parallelize(
    """
      |empid,empage,empdept,empname,salary
      |001,30,XYZ,ABC,10000
    """.stripMargin.lines.toList).toDS()

  val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday1)

  println("first day data ")
  frame.show
  frame.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=1")
  frame.printSchema

  val frame1 = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday2)
  frame1.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=2")
  println("Second day data ")

  frame1.show(false)
  frame1.printSchema

  // Read the partitioned table
  val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
  println("Merged Schema")
  mergedDF.printSchema
  println("Merged Datarame where EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column")
  mergedDF.show(false)


}


Result :

first day data 
+-----+-------+------+
|empid|empname|salary|
+-----+-------+------+
|    1|    ABC| 10000|
+-----+-------+------+

root
 |-- empid: integer (nullable = true)
 |-- empname: string (nullable = true)
 |-- salary: integer (nullable = true)

Second day data 
+-----+------+-------+-------+------+
|empid|empage|empdept|empname|salary|
+-----+------+-------+-------+------+
|1    |30    |XYZ    |ABC    |10000 |
+-----+------+-------+-------+------+

root
 |-- empid: integer (nullable = true)
 |-- empage: integer (nullable = true)
 |-- empdept: string (nullable = true)
 |-- empname: string (nullable = true)
 |-- salary: integer (nullable = true)

Merged Schema
root
 |-- empid: integer (nullable = true)
 |-- empname: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- empage: integer (nullable = true)
 |-- empdept: string (nullable = true)
 |-- day: integer (nullable = true)

Merged Datarame where EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column
+-----+-------+------+------+-------+---+
|empid|empname|salary|empage|empdept|day|
+-----+-------+------+------+-------+---+
|1    |ABC    |10000 |30    |XYZ    |2  |
|1    |ABC    |10000 |null  |null   |1  |
+-----+-------+------+------+-------+---+

Directory tree :

enter image description here

Share:
13,423
Arun S
Author by

Arun S

Updated on June 04, 2022

Comments

  • Arun S
    Arun S almost 2 years

    For schema evolution Mergeschema can be used in Spark for Parquet file formats, and I have below clarifications on this

    Does this support only Parquet file format or any other file formats like csv,txt files.

    If new additional columns are added in between I understand Mergeschema will move the columns to last.

    And if column orders are disturbed then whether Mergeschema will align the columns to correct order when it was created or do we need to do this manually by selecting all the columns.

    Update from Comment : for example If I have a schema as below and create table as below - spark.sql("CREATE TABLE emp USING DELTA LOCATION '****'") empid,empname,salary====> 001,ABC,10000 and next day if I get below format empid,empage,empdept,empname,salary====> 001,30,XYZ,ABC,10000.

    Whether new columns - empage, empdept will be added after empid,empname,salary columns?

  • Arun S
    Arun S about 4 years
    for eg If i have a schema as below and create table as below - spark.sql("CREATE TABLE emp USING DELTA LOCATION '****'") empid,empname,salary====> 001,ABC,10000 and next day if I get below format empid,empage,empdept,empname,salary====> 001,30,XYZ,ABC,10000 Whether new columns - empage, empdept will be added after empid,empname,salary columns?
  • Ram Ghadiyaram
    Ram Ghadiyaram about 4 years
    why dont you try it with your example in aforementioned way ?
  • Arun S
    Arun S about 4 years
    yes tried with an example and it was moving the columns to last..just wanted to check if you know it already..