How to drop malformed rows while reading csv with schema Spark?

14,207

Solution 1

If you are reading a CSV file and want to drop the rows that do not match the schema. You can do this by adding the option mode as DROPMALFORMED

Input data

a,1.0
b,2.2
c,xyz
d,4.5
e,asfsdfsdf
f,3.1

Schema

val schema = StructType(Seq(
  StructField("key", StringType, false),
  StructField("value", DoubleType, false)
))

Reading a csv file with schema and option as

  val df = spark.read.schema(schema)
    .option("mode", "DROPMALFORMED")
    .csv("/path to csv file ")

Output:

+-----+-----+
|key  |value|
+-----+-----+
|hello|1.0  |
|hi   |2.2  |
|how  |3.1  |
|you  |4.5  |
+-----+-----+

You can get more details on spark-csv here

Hope this helps!

Solution 2

.option("mode", "DROPMALFORMED") should do the work.

mode (default PERMISSIVE): allows a mode for dealing with corrupt records during parsing.

  • PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a new field configured by columnNameOfCorruptRecord. When a schema is set by user, it sets null for extra fields.

  • DROPMALFORMED : ignores the whole corrupted records.

  • FAILFAST : throws an exception when it meets corrupted records.

Share:
14,207
HouZhe
Author by

HouZhe

Updated on June 14, 2022

Comments

  • HouZhe
    HouZhe almost 2 years

    While I am using Spark DataSet to load a csv file. I prefer designating schema clearly. But I find there are a few rows not compliant with my schema. A column should be double, but some rows are non-numeric values. Is it possible to filter all rows that are not compliant with my schema from DataSet easily?

    val schema = StructType(StructField("col", DataTypes.DoubleType) :: Nil)
    val ds = spark.read.format("csv").option("delimiter", "\t").schema(schema).load("f.csv")
    

    f.csv:

    a
    1.0
    

    I prefer "a" can be filtered from my DataSet easily. Thanks!

  • koiralo
    koiralo about 6 years
    Isn't this same answer as other one ?
  • HouZhe
    HouZhe about 6 years
    Thanks! Both of your answers are correct. So I set the first answer as accepted answer. But thank you both for your valuable answers!
  • Abhi
    Abhi over 5 years
    How do I restore corrupt records if I want them for audit purpose. For eg, I want to create a new df with corrupt records.
  • kensai
    kensai about 5 years
    Exactly, malformed records should be reported back to source system/vendor, its super basic function, do we have such feature in Spark?
  • kensai
    kensai about 5 years
    It will be better when result of CSV loader are actually 2 separate dataframe objects, one parsed, second corrupted.
  • Ricardo Mutti
    Ricardo Mutti almost 4 years
    One caveat is that you silently drop data from your input data