Spark SQL - loading csv/psv files with some malformed records

16,453

In your case it may not be the Spark parsing part of it which fails, but rather the fact that the default is actually PERMISSIVE such that it parses best-effort into a malformed record that then causes problems further downstream in your processing logic.

You should be able to simply add the option:

.option("mode", "DROPMALFORMED")

like this:

sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", format("header"))
        .option("delimiter", format("delimeter"))
        .option("quote", format("quote"))
        .option("escape", format("escape"))
        .option("charset", "UTF-8")
        // Column types are unnecessary for our current use cases.
        //.option("inferschema", "true")
        .option("mode", "DROPMALFORMED")
        .load(glob)

and it'll skip the lines with incorrect number of delimiters or which don't match the schema, rather than letting them cause errors later on in the code.

Share:
16,453
Edmon
Author by

Edmon

Data, R&D

Updated on June 05, 2022

Comments

  • Edmon
    Edmon almost 2 years

    We are loading hierarchies of directories of files with Spark and converting them to Parquet. There are tens of gigabytes in hundreds of pipe-separated files. Some are pretty big themselves.

    Every, say, 100th file has a row or two that has an extra delimiter that makes the whole process (or the file) abort.

    We are loading using:

    sqlContext.read
            .format("com.databricks.spark.csv")
            .option("header", format("header"))
            .option("delimiter", format("delimeter"))
            .option("quote", format("quote"))
            .option("escape", format("escape"))
            .option("charset", "UTF-8")
            // Column types are unnecessary for our current use cases.
            //.option("inferschema", "true")
            .load(glob)
    

    Is there any extension or a event handling mechanism with Spark that we could attach to the logic that reads rows, that, if the malformed row is encountered, just skips the row instead of failing the process on it?

    (We are planning to do more pre-processing, but this would be the most immediate and critical fix.)

  • ski_squaw
    ski_squaw almost 6 years
    One caveat is that you silently drop data from your input data