How to partition and write DataFrame in Spark without deleting partitions with no new data?

76,591

Solution 1

The mode option Append has a catch!

df.write.partitionBy("y","m","d")
.mode(SaveMode.Append)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName)

I've tested and saw that this will keep the existing partition files. However, the problem this time is the following: If you run the same code twice (with the same data), then it will create new parquet files instead of replacing the existing ones for the same data (Spark 1.6). So, instead of using Append, we can still solve this problem with Overwrite. Instead of overwriting at the table level, we should overwrite at the partition level.

df.write.mode(SaveMode.Overwrite)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day)

See the following link for more information:

Overwrite specific partitions in spark dataframe write method

(I've updated my reply after suriyanto's comment. Thnx.)

Solution 2

This is an old topic, but I was having the same problem and found another solution, just set your partition overwrite mode to dynamic by using:

spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

So, my spark session is configured like this:

spark = SparkSession.builder.appName('AppName').getOrCreate()
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

Solution 3

I know this is very old. As I can not see any solution posted, I will go ahead and post one. This approach assumes you have a hive table over the directory you want to write to. One way to deal with this problem is to create a temp view from dataFrame which should be added to the table and then use normal hive-like insert overwrite table ... command:

dataFrame.createOrReplaceTempView("temp_view")
spark.sql("insert overwrite table table_name partition ('eventdate', 'hour', 'processtime')select * from temp_view")

It preserves old partitions while (over)writing to only new partitions.

Share:
76,591
Admin
Author by

Admin

Updated on July 09, 2022

Comments

  • Admin
    Admin almost 2 years

    I am trying to save a DataFrame to HDFS in Parquet format using DataFrameWriter, partitioned by three column values, like this:

    dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
    

    As mentioned in this question, partitionBy will delete the full existing hierarchy of partitions at path and replaced them with the partitions in dataFrame. Since new incremental data for a particular day will come in periodically, what I want is to replace only those partitions in the hierarchy that dataFrame has data for, leaving the others untouched.

    To do this it appears I need to save each partition individually using its full path, something like this:

    singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
    

    However I'm having trouble understanding the best way to organize the data into single-partition DataFrames so that I can write them out using their full path. One idea was something like:

    dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
    

    But foreachPartition operates on an Iterator[Row] which is not ideal for writing out to Parquet format.

    I also considered using a select...distinct eventdate, hour, processtime to obtain the list of partitions, and then filtering the original data frame by each of those partitions and saving the results to their full partitioned path. But the distinct query plus a filter for each partition doesn't seem very efficient since it would be a lot of filter/write operations.

    I'm hoping there's a cleaner way to preserve existing partitions for which dataFrame has no data?

    Thanks for reading.

    Spark version: 2.1

  • suriyanto
    suriyanto over 6 years
    Did you test if when you write the same data twice that it replaces the old partition? From my test, it actually create a new parquet file inside the partition directory causing the data to double. I'm on Spark 2.2.
  • Pruthvi Raj
    Pruthvi Raj over 5 years
    i have same problem and i dont want data to be duplicated. did you overcome duplicating the data?
  • sethcall
    sethcall about 5 years
    This as is did not quite work for me, but got me very close (on spark 2.2). If you want to make sure existing partitions are not overwritten, you have to specify the value of the partition statically in the SQL statement, as well as add in IF NOT EXISTS, like so: spark.sql("insert overwrite table table_name partition (col1='1', col2='2', ) IF NOT EXISTS select * from temp_view") By the way, I did see this other thread: stackoverflow.com/a/49691528/834644 specific to 2.3. Although I saw another commenter say it did not work.
  • Madhava Carrillo
    Madhava Carrillo about 5 years
    With 2.3 overwriting specific partitions definitely works, I have been using it for a while. More information on the feature: issues.apache.org/jira/browse/SPARK-20236
  • D3V
    D3V about 5 years
    @sethcall Proposed solution worked very well with 2.1 but haven't checked with 2.2.
  • Markus
    Markus almost 5 years
    Take a look on this SO-answer stating that this behaviour is expected from Append: stackoverflow.com/a/51020951/3757672
  • Louis Yang
    Louis Yang almost 4 years
    Just FYI, setting partitionOverwriteMode to 'dynamic' somehow make the entire writing process extremely slow (3x longer) on our cluster. We are using spark 2.4.0. Not sure if this is fixed in the new version yet.
  • swdev
    swdev almost 4 years
    This should be marked as the real solution. Maybe it is slower but it does what the OP asks.
  • citynorman
    citynorman over 2 years
    fyi spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'static') for the original mode
  • citynorman
    citynorman over 2 years
    Works and did not see a performance degradation on Databricks 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)