Spark MergeSchema on parquet columns
Q : 1. Does this support only Parquet file format or any other file formats like csv,txt files. 2. if column orders are disturbed then whether Mergeschema will align the columns to correct order when it was created or do we need to do this manuallly by selecting all the columns
AFAIK Merge schema is supported only by parquet not by other format like csv , txt.
Mergeschema (spark.sql.parquet.mergeSchema
) will align the columns in the correct order even they are distributed.
Example from spark documentation on parquet schema-merging:
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
UPDATE : Real example given by you in the comment box...
Q : Whether new columns -
empage, empdept
will be added afterempid,empname,salary columns
?
Answer : Yes EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column.
see the full example.
package examples
import org.apache.log4j.Level
import org.apache.spark.sql.SaveMode
object CSVDataSourceParquetSchemaMerge extends App {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("CSVParquetSchemaMerge")
.master("local")
.getOrCreate()
import spark.implicits._
val csvDataday1 = spark.sparkContext.parallelize(
"""
|empid,empname,salary
|001,ABC,10000
""".stripMargin.lines.toList).toDS()
val csvDataday2 = spark.sparkContext.parallelize(
"""
|empid,empage,empdept,empname,salary
|001,30,XYZ,ABC,10000
""".stripMargin.lines.toList).toDS()
val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday1)
println("first day data ")
frame.show
frame.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=1")
frame.printSchema
val frame1 = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday2)
frame1.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=2")
println("Second day data ")
frame1.show(false)
frame1.printSchema
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
println("Merged Schema")
mergedDF.printSchema
println("Merged Datarame where EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column")
mergedDF.show(false)
}
Result :
first day data
+-----+-------+------+
|empid|empname|salary|
+-----+-------+------+
| 1| ABC| 10000|
+-----+-------+------+
root
|-- empid: integer (nullable = true)
|-- empname: string (nullable = true)
|-- salary: integer (nullable = true)
Second day data
+-----+------+-------+-------+------+
|empid|empage|empdept|empname|salary|
+-----+------+-------+-------+------+
|1 |30 |XYZ |ABC |10000 |
+-----+------+-------+-------+------+
root
|-- empid: integer (nullable = true)
|-- empage: integer (nullable = true)
|-- empdept: string (nullable = true)
|-- empname: string (nullable = true)
|-- salary: integer (nullable = true)
Merged Schema
root
|-- empid: integer (nullable = true)
|-- empname: string (nullable = true)
|-- salary: integer (nullable = true)
|-- empage: integer (nullable = true)
|-- empdept: string (nullable = true)
|-- day: integer (nullable = true)
Merged Datarame where EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column
+-----+-------+------+------+-------+---+
|empid|empname|salary|empage|empdept|day|
+-----+-------+------+------+-------+---+
|1 |ABC |10000 |30 |XYZ |2 |
|1 |ABC |10000 |null |null |1 |
+-----+-------+------+------+-------+---+
Directory tree :
Arun S
Updated on June 04, 2022Comments
-
Arun S almost 2 years
For schema evolution Mergeschema can be used in Spark for Parquet file formats, and I have below clarifications on this
Does this support only Parquet file format or any other file formats like csv,txt files.
If new additional columns are added in between I understand Mergeschema will move the columns to last.
And if column orders are disturbed then whether Mergeschema will align the columns to correct order when it was created or do we need to do this manually by selecting all the columns.
Update from Comment : for example If I have a schema as below and create table as below -
spark.sql("CREATE TABLE emp USING DELTA LOCATION '****'")
empid,empname,salary====> 001,ABC,10000
and next day if I get below formatempid,empage,empdept,empname,salary====> 001,30,XYZ,ABC,10000
.Whether new columns -
empage, empdept
will be added afterempid,empname,salary columns
? -
Arun S about 4 yearsfor eg If i have a schema as below and create table as below - spark.sql("CREATE TABLE emp USING DELTA LOCATION '****'") empid,empname,salary====> 001,ABC,10000 and next day if I get below format empid,empage,empdept,empname,salary====> 001,30,XYZ,ABC,10000 Whether new columns - empage, empdept will be added after empid,empname,salary columns?
-
Ram Ghadiyaram about 4 yearswhy dont you try it with your example in aforementioned way ?
-
Arun S about 4 yearsyes tried with an example and it was moving the columns to last..just wanted to check if you know it already..