Pyspark: Parse a column of json strings

122,285

Solution 1

Converting a dataframe with json strings to structured dataframe is'a actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)

For example:

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
 |-- body: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- sub_json: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- sub_sub_json: struct (nullable = true)
 |    |    |    |-- col1: long (nullable = true)
 |    |    |    |-- col2: string (nullable = true)
 |-- header: struct (nullable = true)
 |    |-- foo: string (nullable = true)
 |    |-- id: long (nullable = true)

Solution 2

For Spark 2.1+, you can use from_json which allows the preservation of the other non-json columns within the dataframe as follows:

from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))

You let Spark derive the schema of the json string column. Then the df.json column is no longer a StringType, but the correctly decoded json structure, i.e., nested StrucType and all the other columns of df are preserved as-is.

You can access the json content as follows:

df.select(col('json.header').alias('header'))

Solution 3

Existing answers do not work if your JSON is anything but perfectly/traditionally formatted. For example, the RDD-based schema inference expects JSON in curly-braces {} and will provide an incorrect schema (resulting in null values) if, for example, your data looks like:

[
  {
    "a": 1.0,
    "b": 1
  },
  {
    "a": 0.0,
    "b": 2
  }
]

I wrote a function to work around this issue by sanitizing JSON such that it lives in another JSON object:

def parseJSONCols(df, *cols, sanitize=True):
    """Auto infer the schema of a json column and parse into a struct.

    rdd-based schema inference works if you have well-formatted JSON,
    like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
    string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
    can fix everything by wrapping the data in another JSON object
    (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
    automatically performs the wrapping and unwrapping.

    The schema inference is based on this
    `SO Post <https://stackoverflow.com/a/45880574)/>`_.

    Parameters
    ----------
    df : pyspark dataframe
        Dataframe containing the JSON cols.
    *cols : string(s)
        Names of the columns containing JSON.
    sanitize : boolean
        Flag indicating whether you'd like to sanitize your records
        by wrapping and unwrapping them in another JSON object layer.

    Returns
    -------
    pyspark dataframe
        A dataframe with the decoded columns.
    """
    res = df
    for i in cols:

        # sanitize if requested.
        if sanitize:
            res = (
                res.withColumn(
                    i,
                    psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
                )
            )
        # infer schema and apply it
        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
        res = res.withColumn(i, psf.from_json(psf.col(i), schema))

        # unpack the wrapped object if needed
        if sanitize:
            res = res.withColumn(i, psf.col(i).data)
    return res

Note: psf = pyspark.sql.functions.

Solution 4

Here's a concise (spark SQL) version of @nolan-conaway's parseJSONCols function.

SELECT 
explode(
    from_json(
        concat('{"data":', 
               '[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]', 
               '}'), 
        'data array<struct<a:DOUBLE, b:INT>>'
    ).data) as data;

PS. I've added the explode function as well :P

You'll need to know some HIVE SQL types

Solution 5

If you don't know the schema of each JSON (and it can be different) you can use :

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
 
# ... here you get your DF

# Assuming the first column of your DF is the JSON to parse
my_df = spark.read.json(my_df.rdd.map(lambda x: x[0]))

Note that it won't keep any other column present in your dataset. From : https://github.com/apache/spark/pull/22775

Share:
122,285

Related videos on Youtube

Steve
Author by

Steve

Data/Dev

Updated on July 28, 2022

Comments

  • Steve
    Steve over 1 year

    I have a pyspark dataframe consisting of one column, called json, where each row is a unicode string of json. I'd like to parse each row and return a new dataframe where each row is the parsed json.

    # Sample Data Frame
    jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
    jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
    jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
    df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
    

    I've tried mapping over each row with json.loads:

    (df
      .select('json')
      .rdd
      .map(lambda x: json.loads(x))
      .toDF()
    ).show()
    

    But this returns a TypeError: expected string or buffer

    I suspect that part of the problem is that when converting from a dataframe to an rdd, the schema information is lost, so I've also tried manually entering in the schema info:

    schema = StructType([StructField('json', StringType(), True)])
    rdd = (df
      .select('json')
      .rdd
      .map(lambda x: json.loads(x))
    )
    new_df = sql_context.createDataFrame(rdd, schema)
    new_df.show()
    

    But I get the same TypeError.

    Looking at this answer, it looks like flattening out the rows with flatMap might be useful here, but I'm not having success with that either:

    schema = StructType([StructField('json', StringType(), True)])
    rdd = (df
      .select('json')
      .rdd
      .flatMap(lambda x: x)
      .flatMap(lambda x: json.loads(x))
      .map(lambda x: x.get('body'))
    )
    new_df = sql_context.createDataFrame(rdd, schema)
    new_df.show()
    

    I get this error: AttributeError: 'unicode' object has no attribute 'get'.

  • Steve
    Steve over 7 years
    This is great - Thanks! Is there a way to convert the structtypes to maptypes? Later in my code, I'm parsing out each maptype by explodeing the columns.
  • Steve
    Steve over 7 years
    Ah I think I've figured it out: I can avoid using maptypes by doing something like this: body = new_df.select('body').rdd.map(lambda r: r.body).toDF()
  • Mariusz
    Mariusz over 7 years
    Actally it's much simpler: just type new_df.select('body') and you will have dataframe with body objects only.
  • passionate
    passionate over 6 years
    When I try it with streaming data frame (structured streaming), I get an error that Queries with streaming sources must be executed with writeStream.start();;\nkafka. Can you please help me how I can use the JSON data from kafka streaming.
  • Martin Tapp
    Martin Tapp over 6 years
    Just use a regular dataframe/rdd to extract the json schema from a batch/sample of data. Then, use the extracted schema in your streaming app.
  • Buthetleon
    Buthetleon about 5 years
    > For example, the RDD-based schema inference expects JSON in curly-braces where did you read this? awesome find!
  • Nolan Conaway
    Nolan Conaway about 5 years
    " where did you read this? ". I can't say i read it anywhere, I simply found that pyspark did not parse my JSON unless this was true.
  • Charles Chow
    Charles Chow almost 5 years
    Hi, can you tell me what is col in your code? is it the 'json' column object?
  • Martin Tapp
    Martin Tapp almost 5 years
    It's a Spark function which you can import see spark.apache.org/docs/2.4.0/api/python/…
  • Ophir Yoktan
    Ophir Yoktan almost 5 years
    Cool!, is there a way to join the new data frame with the original (which has other fields besides the json string)
  • Mariusz
    Mariusz almost 5 years
    @OphirYoktan Unfortunately not. For this I recommend from_json described in the Martin's answer here.
  • Eric Zheng
    Eric Zheng about 4 years
    Would it be inefficient letting Spark derive the schema of the json?
  • Martin Tapp
    Martin Tapp about 4 years
    Don't understand your question as Spark is deriving the schema here...
  • Vzzarr
    Vzzarr almost 2 years
    just a 'gotcha' as it gave me some headache: in the part row.json, the 'json' is referring to the column named 'json', so if your column is 'my_json' then is going to be row.my_json (unhappy choice of column names)
  • panc
    panc almost 2 years
    How does Spark determine the data type of each field when decoding the json structure? Is it safe to leave it to Spark to infer the data types?