How to partition and write DataFrame in Spark without deleting partitions with no new data?
Solution 1
The mode option Append
has a catch!
df.write.partitionBy("y","m","d")
.mode(SaveMode.Append)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName)
I've tested and saw that this will keep the existing partition files. However, the problem this time is the following: If you run the same code twice (with the same data), then it will create new parquet files instead of replacing the existing ones for the same data (Spark 1.6). So, instead of using Append
, we can still solve this problem with Overwrite
. Instead of overwriting at the table level, we should overwrite at the partition level.
df.write.mode(SaveMode.Overwrite)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day)
See the following link for more information:
Overwrite specific partitions in spark dataframe write method
(I've updated my reply after suriyanto's comment. Thnx.)
Solution 2
This is an old topic, but I was having the same problem and found another solution, just set your partition overwrite mode to dynamic by using:
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
So, my spark session is configured like this:
spark = SparkSession.builder.appName('AppName').getOrCreate()
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
Solution 3
I know this is very old. As I can not see any solution posted, I will go ahead and post one. This approach assumes you have a hive table over the directory you want to write to.
One way to deal with this problem is to create a temp view from dataFrame
which should be added to the table and then use normal hive-like insert overwrite table ...
command:
dataFrame.createOrReplaceTempView("temp_view")
spark.sql("insert overwrite table table_name partition ('eventdate', 'hour', 'processtime')select * from temp_view")
It preserves old partitions while (over)writing to only new partitions.
Admin
Updated on July 09, 2022Comments
-
Admin almost 2 years
I am trying to save a
DataFrame
to HDFS in Parquet format usingDataFrameWriter
, partitioned by three column values, like this:dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
As mentioned in this question,
partitionBy
will delete the full existing hierarchy of partitions atpath
and replaced them with the partitions indataFrame
. Since new incremental data for a particular day will come in periodically, what I want is to replace only those partitions in the hierarchy thatdataFrame
has data for, leaving the others untouched.To do this it appears I need to save each partition individually using its full path, something like this:
singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
However I'm having trouble understanding the best way to organize the data into single-partition
DataFrame
s so that I can write them out using their full path. One idea was something like:dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
But
foreachPartition
operates on anIterator[Row]
which is not ideal for writing out to Parquet format.I also considered using a
select...distinct eventdate, hour, processtime
to obtain the list of partitions, and then filtering the original data frame by each of those partitions and saving the results to their full partitioned path. But the distinct query plus a filter for each partition doesn't seem very efficient since it would be a lot of filter/write operations.I'm hoping there's a cleaner way to preserve existing partitions for which
dataFrame
has no data?Thanks for reading.
Spark version: 2.1
-
suriyanto over 6 yearsDid you test if when you write the same data twice that it replaces the old partition? From my test, it actually create a new parquet file inside the partition directory causing the data to double. I'm on Spark 2.2.
-
Pruthvi Raj over 5 yearsi have same problem and i dont want data to be duplicated. did you overcome duplicating the data?
-
sethcall about 5 yearsThis as is did not quite work for me, but got me very close (on spark 2.2). If you want to make sure existing partitions are not overwritten, you have to specify the value of the partition statically in the SQL statement, as well as add in IF NOT EXISTS, like so:
spark.sql("insert overwrite table table_name partition (col1='1', col2='2', ) IF NOT EXISTS select * from temp_view")
By the way, I did see this other thread: stackoverflow.com/a/49691528/834644 specific to 2.3. Although I saw another commenter say it did not work. -
Madhava Carrillo about 5 yearsWith 2.3 overwriting specific partitions definitely works, I have been using it for a while. More information on the feature: issues.apache.org/jira/browse/SPARK-20236
-
D3V about 5 years@sethcall Proposed solution worked very well with 2.1 but haven't checked with 2.2.
-
Markus almost 5 yearsTake a look on this SO-answer stating that this behaviour is expected from
Append
: stackoverflow.com/a/51020951/3757672 -
Louis Yang almost 4 yearsJust FYI, setting partitionOverwriteMode to 'dynamic' somehow make the entire writing process extremely slow (3x longer) on our cluster. We are using spark 2.4.0. Not sure if this is fixed in the new version yet.
-
swdev almost 4 yearsThis should be marked as the real solution. Maybe it is slower but it does what the OP asks.
-
citynorman over 2 yearsfyi
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'static')
for the original mode -
citynorman over 2 yearsWorks and did not see a performance degradation on Databricks 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)