Save Spark dataframe as dynamic partitioned table in Hive
Solution 1
I believe it works something like this:
df
is a dataframe with year, month and other columns
df.write.partitionBy('year', 'month').saveAsTable(...)
or
df.write.partitionBy('year', 'month').insertInto(...)
Solution 2
I was able to write to partitioned hive table using df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")
I had to enable the following properties to make it work.
hiveContext.setConf("hive.exec.dynamic.partition", "true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
Solution 3
I also faced same thing but using following tricks I resolved.
When we Do any table as partitioned then partitioned column become case sensitive.
Partitioned column should be present in DataFrame with same name (case sensitive). Code:
var dbName="your database name" var finaltable="your table name" // First check if table is available or not.. if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) { //If table is not available then it will create for you.. println("Table Not Present \n Creating table " + finaltable) sparkSession.sql("use Database_Name") sparkSession.sql("SET hive.exec.dynamic.partition = true") sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ") sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400") sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID string,EMP_Name string,EMP_Address string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)") //Table is created now insert the DataFrame in append Mode df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) }
Solution 4
it can be configured on SparkSession
in that way:
spark = SparkSession \
.builder \
...
.config("spark.hadoop.hive.exec.dynamic.partition", "true") \
.config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
or you can add them to .properties file
the spark.hadoop
prefix is needed by Spark config (at least in 2.4) and here is how Spark sets this config:
/**
* Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
* configuration without the spark.hadoop. prefix.
*/
def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
}
Solution 5
This is what works for me. I set these settings and then put the data in partitioned tables.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode",
"nonstrict")
Chetandalal
Software Engineering Graduate student at Concordia University, Montreal. Interested in working with different technologies and tackling everyday challenges!!
Updated on June 01, 2021Comments
-
Chetandalal almost 3 years
I have a sample application working to read from csv files into a dataframe. The dataframe can be stored to a Hive table in parquet format using the method
df.saveAsTable(tablename,mode)
.The above code works fine, but I have so much data for each day that i want to dynamic partition the hive table based on the creationdate(column in the table).
is there any way to dynamic partition the dataframe and store it to hive warehouse. Want to refrain from Hard-coding the insert statement using
hivesqlcontext.sql(insert into table partittioin by(date)....)
.Question can be considered as an extension to :How to save DataFrame directly to Hive?
any help is much appreciated.
-
Chetandalal almost 9 yearsTried this Partitionby method. It only works on RDD level, once dataframe is created most of the methods are DBMS styled e.g. groupby, orderby but they don't serve the purpose of writing in different partitions folders on Hive.
-
Chetandalal almost 9 yearsOk, so was able to work it out with 1.4 version. df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename"); . This however changes my date field to integer value and remove the actual date. e.g. there are 9 unique dates in the column but they are now stored as 1,2,3.... and folder name is date=1,2,3,... instead of date=20141121. Let me know if there is a way to do this.
-
Vrushank Doshi almost 8 yearsWhere should i set the above 2 parameters ? I tried logging in hive shell and run above commands, it failed. i am sure i am doing it wrong. Could you please tell where can i set these properties ?
-
MV23 almost 8 years@VrushankDoshi You would set it in the spark program, right after you create your hiveContext. val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.setConf("hive.exec.dynamic.partition","true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
-
Ram Ghadiyaram about 7 years@subramaniam-ramasubramanian: pls reply to OP s question as answer instead of editing existing answer
-
sri hari kali charan Tummala over 6 yearsdf.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) dont you need to mention partitionBy ? example df.write.mode(SaveMode.Append).partitionBy("EMP_DEP" ).insertInto(empDB + "." + finaltable)
-
Nilesh Shinde over 6 yearsNo need.. its optional
-
sri hari kali charan Tummala over 6 yearsmy tables are existing tables in hive
-
enneppi about 6 yearsfrom my side this code overwrites but do not appends any data. why?
-
nir over 5 yearsDoes this work for overwriting multiple dynamic partition without loosing other partitions in base directory
-
Tutu Kumari about 5 yearsit will give error : with append it's necessary to use insertInto in a RDD-based tables, insertinto demands already existing table in hive.
-
Harshvardhan Solanki almost 4 years@mdurant : I am getting following exception: org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException: Partition spec {rivalname=, rivalName=_________} contains non-partition columns; Actually my partition column is also present in df.
-
mdurant almost 4 yearsThis answer is five years old - would be happy to see it updated with whatever new syntax spark might have.