How to drop malformed rows while reading csv with schema Spark?
Solution 1
If you are reading a CSV
file and want to drop the rows that do not match the schema. You can do this by adding the option mode
as DROPMALFORMED
Input data
a,1.0
b,2.2
c,xyz
d,4.5
e,asfsdfsdf
f,3.1
Schema
val schema = StructType(Seq(
StructField("key", StringType, false),
StructField("value", DoubleType, false)
))
Reading a csv
file with schema
and option
as
val df = spark.read.schema(schema)
.option("mode", "DROPMALFORMED")
.csv("/path to csv file ")
Output:
+-----+-----+
|key |value|
+-----+-----+
|hello|1.0 |
|hi |2.2 |
|how |3.1 |
|you |4.5 |
+-----+-----+
You can get more details on spark-csv here
Hope this helps!
Solution 2
.option("mode", "DROPMALFORMED")
should do the work.
mode
(default PERMISSIVE
): allows a mode for dealing with corrupt records during parsing.
PERMISSIVE
: sets other fields tonull
when it meets a corrupted record, and puts the malformed string into a new field configured bycolumnNameOfCorruptRecord
. When a schema is set by user, it setsnull
for extra fields.DROPMALFORMED
: ignores the whole corrupted records.FAILFAST
: throws an exception when it meets corrupted records.
HouZhe
Updated on June 14, 2022Comments
-
HouZhe almost 2 years
While I am using Spark DataSet to load a csv file. I prefer designating schema clearly. But I find there are a few rows not compliant with my schema. A column should be double, but some rows are non-numeric values. Is it possible to filter all rows that are not compliant with my schema from DataSet easily?
val schema = StructType(StructField("col", DataTypes.DoubleType) :: Nil) val ds = spark.read.format("csv").option("delimiter", "\t").schema(schema).load("f.csv")
f.csv:
a 1.0
I prefer "a" can be filtered from my DataSet easily. Thanks!
-
koiralo about 6 yearsIsn't this same answer as other one ?
-
HouZhe about 6 yearsThanks! Both of your answers are correct. So I set the first answer as accepted answer. But thank you both for your valuable answers!
-
Abhi over 5 yearsHow do I restore corrupt records if I want them for audit purpose. For eg, I want to create a new df with corrupt records.
-
kensai about 5 yearsExactly, malformed records should be reported back to source system/vendor, its super basic function, do we have such feature in Spark?
-
kensai about 5 yearsIt will be better when result of CSV loader are actually 2 separate dataframe objects, one parsed, second corrupted.
-
Ricardo Mutti almost 4 yearsOne caveat is that you silently drop data from your input data