Save Spark dataframe as dynamic partitioned table in Hive

108,202

Solution 1

I believe it works something like this:

df is a dataframe with year, month and other columns

df.write.partitionBy('year', 'month').saveAsTable(...)

or

df.write.partitionBy('year', 'month').insertInto(...)

Solution 2

I was able to write to partitioned hive table using df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")

I had to enable the following properties to make it work.

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

Solution 3

I also faced same thing but using following tricks I resolved.

  1. When we Do any table as partitioned then partitioned column become case sensitive.

  2. Partitioned column should be present in DataFrame with same name (case sensitive). Code:

    var dbName="your database name"
    var finaltable="your table name"
    
    // First check if table is available or not..
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
         //If table is not available then it will create for you..
         println("Table Not Present \n  Creating table " + finaltable)
         sparkSession.sql("use Database_Name")
         sparkSession.sql("SET hive.exec.dynamic.partition = true")
         sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
         sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
         sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID        string,EMP_Name          string,EMP_Address               string,EMP_Salary    bigint)  PARTITIONED BY (EMP_DEP STRING)")
         //Table is created now insert the DataFrame in append Mode
         df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
    }
    

Solution 4

it can be configured on SparkSession in that way:

spark = SparkSession \
    .builder \
    ...
    .config("spark.hadoop.hive.exec.dynamic.partition", "true") \
    .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

or you can add them to .properties file

the spark.hadoop prefix is needed by Spark config (at least in 2.4) and here is how Spark sets this config:

  /**
   * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
   * configuration without the spark.hadoop. prefix.
   */
  def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
    SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
  }

Solution 5

This is what works for me. I set these settings and then put the data in partitioned tables.

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", 
"nonstrict")
Share:
108,202
Chetandalal
Author by

Chetandalal

Software Engineering Graduate student at Concordia University, Montreal. Interested in working with different technologies and tackling everyday challenges!!

Updated on June 01, 2021

Comments

  • Chetandalal
    Chetandalal almost 3 years

    I have a sample application working to read from csv files into a dataframe. The dataframe can be stored to a Hive table in parquet format using the method df.saveAsTable(tablename,mode).

    The above code works fine, but I have so much data for each day that i want to dynamic partition the hive table based on the creationdate(column in the table).

    is there any way to dynamic partition the dataframe and store it to hive warehouse. Want to refrain from Hard-coding the insert statement using hivesqlcontext.sql(insert into table partittioin by(date)....).

    Question can be considered as an extension to :How to save DataFrame directly to Hive?

    any help is much appreciated.

  • Chetandalal
    Chetandalal almost 9 years
    Tried this Partitionby method. It only works on RDD level, once dataframe is created most of the methods are DBMS styled e.g. groupby, orderby but they don't serve the purpose of writing in different partitions folders on Hive.
  • Chetandalal
    Chetandalal almost 9 years
    Ok, so was able to work it out with 1.4 version. df.write().mode(SaveMode.Append).partitionBy("date").saveAsT‌​able("Tablename"); . This however changes my date field to integer value and remove the actual date. e.g. there are 9 unique dates in the column but they are now stored as 1,2,3.... and folder name is date=1,2,3,... instead of date=20141121. Let me know if there is a way to do this.
  • Vrushank Doshi
    Vrushank Doshi almost 8 years
    Where should i set the above 2 parameters ? I tried logging in hive shell and run above commands, it failed. i am sure i am doing it wrong. Could you please tell where can i set these properties ?
  • MV23
    MV23 almost 8 years
    @VrushankDoshi You would set it in the spark program, right after you create your hiveContext. val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.setConf("hive.exec.dynamic.partition","true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
  • Ram Ghadiyaram
    Ram Ghadiyaram about 7 years
    @subramaniam-ramasubramanian: pls reply to OP s question as answer instead of editing existing answer
  • sri hari kali charan Tummala
    sri hari kali charan Tummala over 6 years
    df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) dont you need to mention partitionBy ? example df.write.mode(SaveMode.Append).partitionBy("EMP_DEP" ).insertInto(empDB + "." + finaltable)
  • Nilesh Shinde
    Nilesh Shinde over 6 years
    No need.. its optional
  • sri hari kali charan Tummala
    sri hari kali charan Tummala over 6 years
    my tables are existing tables in hive
  • enneppi
    enneppi about 6 years
    from my side this code overwrites but do not appends any data. why?
  • nir
    nir over 5 years
    Does this work for overwriting multiple dynamic partition without loosing other partitions in base directory
  • Tutu Kumari
    Tutu Kumari about 5 years
    it will give error : with append it's necessary to use insertInto in a RDD-based tables, insertinto demands already existing table in hive.
  • Harshvardhan Solanki
    Harshvardhan Solanki almost 4 years
    @mdurant : I am getting following exception: org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSe‌​manticException: Partition spec {rivalname=, rivalName=_________} contains non-partition columns; Actually my partition column is also present in df.
  • mdurant
    mdurant almost 4 years
    This answer is five years old - would be happy to see it updated with whatever new syntax spark might have.