Databricks - failing to write from a DataFrame to a Delta location

19,504

Solution 1

In general, it is a good idea to avoid using rm on Delta tables. Delta's transaction log can prevent eventual consistency issues in most cases, however, when you delete and recreate a table in a very short time, different versions of the transaction log can flicker in and out of existence.

Instead, I'd recommend using the transactional primitives provided by Delta. For example, to overwrite the data in a table you can:

df.write.format("delta").mode("overwrite").save("/delta/events")

If you have a table that has already been corrupted, you can fix it using FSCK.

Solution 2

You could do that in the following way.

// Read old table data
val old_data_DF = spark.read.format("delta")
.load("dbfs:/mnt/main/sales")

// Created a new DF with a renamed column
val new_data_DF = old_data_DF
  .withColumnRenamed("column_a", "metric1")
  .select("*")

// Trying to write the new DF to the location
new_data_DF.write
.format("delta")
.mode("overwrite") // this would overwrite the whole data files
.option("overwriteSchema", "true")  //this is the key line.
.partitionBy("sale_date_partition")
.save("dbfs:/mnt/main/sales")

OverWriteSchema option will create new physical files with latest schema that we have updated during transformation.

Share:
19,504
samba
Author by

samba

Updated on June 04, 2022

Comments

  • samba
    samba almost 2 years

    I wanted to change a column name of a Databricks Delta table.

    So I did the following:

    // Read old table data
    val old_data_DF = spark.read.format("delta")
    .load("dbfs:/mnt/main/sales")
    
    // Created a new DF with a renamed column
    val new_data_DF = old_data_DF
          .withColumnRenamed("column_a", "metric1")
          .select("*")
    
    // Dropped and recereated the Delta files location
    dbutils.fs.rm("dbfs:/mnt/main/sales", true)
    dbutils.fs.mkdirs("dbfs:/mnt/main/sales")
    
    // Trying to write the new DF to the location
    new_data_DF.write
    .format("delta")
    .partitionBy("sale_date_partition")
    .save("dbfs:/mnt/main/sales")
    

    Here I'm getting an Error at the last step when writing to Delta:

    java.io.FileNotFoundException: dbfs:/mnt/main/sales/sale_date_partition=2019-04-29/part-00000-769.c000.snappy.parquet
    A file referenced in the transaction log cannot be found. This occurs when data has been manually deleted from the file system rather than using the table `DELETE` statement
    

    Obviously the data was deleted and most likely I've missed something in the above logic. Now the only place that contains the data is the new_data_DF. Writing to a location like dbfs:/mnt/main/sales_tmp also fails

    What should I do to write data from new_data_DF to a Delta location?