How to overwrite a parquet file from where DataFrame is being read in Spark
Solution 1
One solution for this error is to cache, make an action to the df (example: df.show()
) and then save the parquet file in "overwrite" mode.
in python:
save_mode = "overwrite"
df = spark.read.parquet("path_to_parquet")
....... make your transformation to the df which is new_df
new_df.cache()
new_df.show()
new_df.write.format("parquet")\
.mode(save_mode)\
.save("path_to_parquet")
Solution 2
When data is taken out of a cache it seems to work fine.
val df = spark.read.format("parquet").load("temp").cache()
cache
is a lazy operation, and doesn't trigger any computation, we have to add some dummy action.
println(df.count()) //count over parquet files should be very fast
Now it should work:
df.repartition(1).write.mode(SaveMode.Overwrite).parquet("temp")
cph_sto
Updated on June 14, 2022Comments
-
cph_sto almost 2 years
This is a microcosm of the problem I am facing, where I am getting an error. Let me try to reproduce it here.
I am saving a
DataFrame
as aparquet
, but when I reload theDataFrame
fromparquet
file and save it once again asparquet
, I get an error.valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')] df = spark.createDataFrame(valuesCol,['sex','date']) # Save as parquet df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp') # Load it back df = spark.read.format('parquet').load('.../temp') df = df.where(col('sex')=='Male') # Save it back - This produces ERROR df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
Error message -
executor 22): java.io.FileNotFoundException: Requested file maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
Another SO question addresses this issue. The proposed solution was to refresh the table like the code below, but that did not help. The issue is with the refreshing of the metadata. I don't know how to refresh it.
df.createOrReplaceTempView('table_view') spark.catalog.refreshTable('table_view') df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
Workaround for this problem: A non-elegant way to solve this issue is to save the
DataFrame
asparquet
file with a different name, then delete the originalparquet
file and finally, rename thisparquet
file to the old name.# Workaround import os import shutil # Load it back df = spark.read.format('parquet').load('.../temp') # Save it back as temp1, as opposed to original temp df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1') # Delete the original parquet file shutil.rmtree('.../temp') # Renaming the parquet folder. os.rename('.../temp1','.../temp')
But, the problem is that some DataFrames are quite big and this may not be the best way to deal with it. Not to mention if renaming will cause some problem with the MetaData, that I am not sure of.
-
cph_sto over 4 yearsI ran the code with
.cache()
. Sometimes it works and other times it fails with error -Caused by: java.io.IOException: Error: Stale file handle
. But, I think it has nothing to do with your code, as I have seen this error before while saving DFs. Let me test it form sometime before I could accept this as an answer. Thank you. -
cph_sto over 4 yearsCan you offer an explanation as to what difference
cache()
makes? -
Gelerion over 4 yearsWell, I am not 100% sure it will work on a big cluster, I have tested it only on my local environment. I could imagine the situation when the job is run on spot nodes, and all the nodes witch cached data were taken by the cloud provider. In this situation, Spark will try to recompute the lost data and eventually fail with the same exception.
-
cph_sto over 4 yearsOh I see. So, would you say that the workaround that I have proposed remains more robust? Do you think that refreshing the
metadata
as I posted in my question will help? -
Gelerion over 4 years
cache
stores data on a local disk per executor, then when we write the data, it will be taken from the disk and not from the remote file. -
cph_sto over 4 yearsSo, that's why
.cache()
works becausedf
is not pointing to the originalparquet
file, but a location on the disk, correct? -
Gelerion over 4 yearsyes, exactly. I had no success with refreshing
metadata
-
Gelerion over 4 yearsIt might be a viable solution if you use on-prem or emr cluster with static nodes, also you could increase replication factor.
-
cph_sto over 4 yearsThank you so much. It helped me get a perspective :)