How to save DataFrame directly to Hive?

224,834

Solution 1

You can create an in-memory temporary table and store them in hive table using sqlContext.

Lets say your data frame is myDf. You can create one temporary table using,

myDf.createOrReplaceTempView("mytempTable") 

Then you can use a simple hive statement to create table and dump the data from your temp table.

sqlContext.sql("create table mytable as select * from mytempTable");

Solution 2

Use DataFrameWriter.saveAsTable. (df.write.saveAsTable(...)) See Spark SQL and DataFrame Guide.

Solution 3

I don't see df.write.saveAsTable(...) deprecated in Spark 2.0 documentation. It has worked for us on Amazon EMR. We were perfectly able to read data from S3 into a dataframe, process it, create a table from the result and read it with MicroStrategy. Vinays answer has also worked though.

Solution 4

you need to have/create a HiveContext

import org.apache.spark.sql.hive.HiveContext;

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

Then directly save dataframe or select the columns to store as hive table

df is dataframe

df.write().mode("overwrite").saveAsTable("schemaName.tableName");

or

df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");

or

df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");

SaveModes are Append/Ignore/Overwrite/ErrorIfExists

I added here the definition for HiveContext from Spark Documentation,

In addition to the basic SQLContext, you can also create a HiveContext, which provides a superset of the functionality provided by the basic SQLContext. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables. To use a HiveContext, you do not need to have an existing Hive setup, and all of the data sources available to a SQLContext are still available. HiveContext is only packaged separately to avoid including all of Hive’s dependencies in the default Spark build.


on Spark version 1.6.2, using "dbName.tableName" gives this error:

org.apache.spark.sql.AnalysisException: Specifying database name or other qualifiers are not allowed for temporary tables. If the table name has dots (.) in it, please quote the table name with backticks ().`

Solution 5

Sorry writing late to the post but I see no accepted answer.

df.write().saveAsTable will throw AnalysisException and is not HIVE table compatible.

Storing DF as df.write().format("hive") should do the trick!

However, if that doesn't work, then going by the previous comments and answers, this is what is the best solution in my opinion (Open to suggestions though).

Best approach is to explicitly create HIVE table (including PARTITIONED table),

def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
  "PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}

save DF as temp table,

df.createOrReplaceTempView("$tempTableName")

and insert into PARTITIONED HIVE table:

spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)

Offcourse the LAST COLUMN in DF will be the PARTITION COLUMN so create HIVE table accordingly!

Please comment if it works! or not.


--UPDATE--

df.write()
  .partitionBy("$partition_column")
  .format("hive")
  .mode(SaveMode.append)
  .saveAsTable($new_table_name_to_be_created_in_hive)  //Table should not exist OR should be a PARTITIONED table in HIVE
Share:
224,834
Gourav
Author by

Gourav

Updated on July 08, 2022

Comments

  • Gourav
    Gourav almost 2 years

    Is it possible to save DataFrame in spark directly to Hive?

    I have tried with converting DataFrame to Rdd and then saving as a text file and then loading in hive. But I am wondering if I can directly save dataframe to hive

  • lazywiz
    lazywiz about 8 years
    This is not a valid HQL statement: cannot recognize input near 'select' '*' 'from' in create table statement; line 1 pos 16
  • RChat
    RChat almost 8 years
    saveAsTable does not create Hive compatible tables. The best solution I found is of Vinay Kumar.
  • dieHellste
    dieHellste almost 8 years
    Is the second command: 'df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName‌​");' requiring that the selected columns that you intend to overwrite already exist in the table? So you have the existing table and you only overwrite the existing columns 1,2,3 with the new data from your df in spark? is that interpreted right?
  • ski_squaw
    ski_squaw over 7 years
    this got around the parquet read errors I was getting when using write.saveAsTable in spark 2.0
  • Daniel Darabos
    Daniel Darabos over 7 years
    @Jacek: I have added this note myself, because I think my answer is wrong. I would delete it, except that it is accepted. Do you think the note is wrong?
  • Jacek Laskowski
    Jacek Laskowski over 7 years
    Yes. The note was wrong and that's why I removed it. "Please correct me if I'm wrong" applies here :)
  • chhantyal
    chhantyal about 7 years
    No problem. Btw, I just found out you can't use PARTITIONED BY clause in this statement.
  • Vinay Kumar
    Vinay Kumar almost 7 years
    Yes.However, we can use partition by on data frame before creating the temp table. @chhantyal
  • Tagar
    Tagar almost 7 years
    @DanielDarabos, why "saveAsTable is deprecated and removed in Spark 2.0.0"? I see it is still quite supported and documented in Spark 2.1: spark.apache.org/docs/latest/…
  • Daniel Darabos
    Daniel Darabos almost 7 years
    I think it used to be df.saveAsTable. That is gone now, but there is df.write.saveAsTable. I don't have a Hive installation to test it against, but it does do something, so you're right. I have no clue. Okay, I'll remove the note!
  • Hemanth Annavarapu
    Hemanth Annavarapu almost 7 years
    Thanks for this answer. I've tried to do the same thing in my program as well. dataframe.registerTempTable("RiskRecon_tmp") hiveContext.sql("CREATE TABLE IF NOT EXISTS RiskRecon_TOES as select * from RiskRecon_tmp"). But I get this error: java.lang.IllegalArgumentException: Wrong FS: file:/tmp/spark-a68a9fc7-50f3-43ae-ac06-8c07ba7253c2/scratch‌​_hive_2017-07-12_07-‌​12-57_948_8232393446‌​428506434-1, expected: hdfs://nameservice1 at the line where I am passing the query. Do you have any idea regarding this? @VinayKumar
  • Vinay Kumar
    Vinay Kumar almost 7 years
  • user1870400
    user1870400 over 6 years
    will this df.write().saveAsTable(tableName) also write streaming data into the table?
  • WestCoastProjects
    WestCoastProjects over 6 years
    How were you able to mix and match the temporary table with the hive table? When doing show tables it only includes the hive tables for my spark 2.3.0 installation
  • Vinay Kumar
    Vinay Kumar over 6 years
    this temporary table will be saved to your hive context and doesn't belong to hive tables in any way.
  • serakfalcon
    serakfalcon over 6 years
    Somebody flagged this answer as low-quality due to length and content. To be honest it probably would have been better as a comment. I guess it's been up for two years and some people have found it helpful so might be good to leave things as is?
  • Alex
    Alex over 6 years
    I agree, comment would have been the better choice. Lesson learned :-)
  • user 923227
    user 923227 almost 6 years
    df.write().mode... needs to be changed to df.write.mode...
  • Brian
    Brian almost 6 years
    no you can't save streaming data with saveAsTable it's not even in the api
  • enneppi
    enneppi over 5 years
    hi @VinayKumar why you say "If you are using saveAsTable(its more like persisting your dataframe) , you have to make sure that you have enough memory allocated to your spark application". could you explain this point?
  • Sade
    Sade over 5 years
    I seem to have an error which states Job aborted. I tried the following code pyspark_df.write.mode("overwrite").saveAsTable("InjuryTab2")
  • Vinay Kumar
    Vinay Kumar about 5 years
    @enneppi its irrelevant. I have updated the answer now.
  • mrsrinivas
    mrsrinivas about 4 years
    Detailed example found here: stackoverflow.com/a/56833395/1592191
  • Harshvardhan Solanki
    Harshvardhan Solanki about 4 years
    @VinayKumar : I tried partitioning DF with partitionBy($column) before storing as temp table, but it did not create any partitions in HIVE. Could you please comment on this. Thnx
  • onofricamila
    onofricamila about 4 years
    Hi! why this? From Spark 2.2: use DataSet instead DataFrame.
  • Scope
    Scope about 3 years
    Hi @VinayKumar how should I import sqlcontext so that I use it this way
  • Rahul P
    Rahul P about 2 years
    This is great. Thank you!