How to overwrite a parquet file from where DataFrame is being read in Spark

10,757

Solution 1

One solution for this error is to cache, make an action to the df (example: df.show()) and then save the parquet file in "overwrite" mode.

in python:

save_mode = "overwrite"
df = spark.read.parquet("path_to_parquet")

....... make your transformation to the df which is new_df

new_df.cache()
new_df.show()

new_df.write.format("parquet")\
                .mode(save_mode)\
                .save("path_to_parquet")

Solution 2

When data is taken out of a cache it seems to work fine.

val df = spark.read.format("parquet").load("temp").cache()

cache is a lazy operation, and doesn't trigger any computation, we have to add some dummy action.

println(df.count()) //count over parquet files should be very fast  

Now it should work:

df.repartition(1).write.mode(SaveMode.Overwrite).parquet("temp")
Share:
10,757
cph_sto
Author by

cph_sto

Updated on June 14, 2022

Comments

  • cph_sto
    cph_sto almost 2 years

    This is a microcosm of the problem I am facing, where I am getting an error. Let me try to reproduce it here.

    I am saving a DataFrame as a parquet, but when I reload the DataFrame from parquet file and save it once again as parquet, I get an error.

    valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')]
    df = spark.createDataFrame(valuesCol,['sex','date'])
    # Save as parquet
    df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
    
    # Load it back
    df = spark.read.format('parquet').load('.../temp')
    df = df.where(col('sex')=='Male')
    # Save it back - This produces ERROR   
    df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
    

    Error message -

    executor 22): java.io.FileNotFoundException: Requested file maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

    Another SO question addresses this issue. The proposed solution was to refresh the table like the code below, but that did not help. The issue is with the refreshing of the metadata. I don't know how to refresh it.

    df.createOrReplaceTempView('table_view')
    spark.catalog.refreshTable('table_view')
    df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
    

    Workaround for this problem: A non-elegant way to solve this issue is to save the DataFrame as parquet file with a different name, then delete the original parquet file and finally, rename this parquet file to the old name.

    # Workaround
    import os
    import shutil
    
    # Load it back
    df = spark.read.format('parquet').load('.../temp')
    
    # Save it back as temp1, as opposed to original temp      
    df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1')
    
    # Delete the original parquet file
    shutil.rmtree('.../temp')
    
    # Renaming the parquet folder.
    os.rename('.../temp1','.../temp')
    

    But, the problem is that some DataFrames are quite big and this may not be the best way to deal with it. Not to mention if renaming will cause some problem with the MetaData, that I am not sure of.

  • cph_sto
    cph_sto over 4 years
    I ran the code with .cache(). Sometimes it works and other times it fails with error - Caused by: java.io.IOException: Error: Stale file handle. But, I think it has nothing to do with your code, as I have seen this error before while saving DFs. Let me test it form sometime before I could accept this as an answer. Thank you.
  • cph_sto
    cph_sto over 4 years
    Can you offer an explanation as to what difference cache() makes?
  • Gelerion
    Gelerion over 4 years
    Well, I am not 100% sure it will work on a big cluster, I have tested it only on my local environment. I could imagine the situation when the job is run on spot nodes, and all the nodes witch cached data were taken by the cloud provider. In this situation, Spark will try to recompute the lost data and eventually fail with the same exception.
  • cph_sto
    cph_sto over 4 years
    Oh I see. So, would you say that the workaround that I have proposed remains more robust? Do you think that refreshing the metadata as I posted in my question will help?
  • Gelerion
    Gelerion over 4 years
    cache stores data on a local disk per executor, then when we write the data, it will be taken from the disk and not from the remote file.
  • cph_sto
    cph_sto over 4 years
    So, that's why .cache() works because df is not pointing to the original parquet file, but a location on the disk, correct?
  • Gelerion
    Gelerion over 4 years
    yes, exactly. I had no success with refreshing metadata
  • Gelerion
    Gelerion over 4 years
    It might be a viable solution if you use on-prem or emr cluster with static nodes, also you could increase replication factor.
  • cph_sto
    cph_sto over 4 years
    Thank you so much. It helped me get a perspective :)