Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column

19,346

Solution 1

The problem is with the JSON file. The file : "D:/playground/input.json" looks like as you descibed as

{
  "a": {
  "b": 1
  }
}

This is not right. Spark while processing json data considers each new line as a complete json. Thus it is failing.

You should keep your complete json in a single line in a compact form by removing all white spaces and newlines.

Like

{"a":{"b":1}}

If you want multiple jsons in a single file keep them like this

{"a":{"b":1}}
{"a":{"b":2}}
{"a":{"b":3}} ...

For more infos see

Solution 2

You may try either of these two ways.

Option-1: JSON in single line as answered above by @Avishek Bhattacharya.

Option-2: Add option to read multi line JSON in the code as follows. You could read the nested attribute also as shown below.

val df = spark.read.option("multiline","true").json("C:\\data\\nested-data.json")
df.select("a.b").show()

Here is the output for Option-2.

20/07/29 23:14:35 INFO DAGScheduler: Job 1 finished: show at NestedJsonReader.scala:23, took 0.181579 s
+---+
|  b|
+---+
|  1|
+---+
Share:
19,346
Alon
Author by

Alon

Updated on June 06, 2022

Comments

  • Alon
    Alon almost 2 years

    I have a json file:

    {
      "a": {
        "b": 1
      }
    }
    

    I am trying to read it:

    val path = "D:/playground/input.json"
    val df = spark.read.json(path)
    df.show()
    

    But getting an error:

    Exception in thread "main" org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default). For example: spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count() and spark.read.schema(schema).json(file).select("_corrupt_record").show(). Instead, you can cache or save the parsed results and then send the same query. For example, val df = spark.read.schema(schema).json(file).cache() and then df.filter($"_corrupt_record".isNotNull).count().;

    So I tried to cache it as they suggest:

    val path = "D:/playground/input.json"
    val df = spark.read.json(path).cache()
    df.show()
    

    But I keep getting the same error.