Using Pyspark to read JSON items from an array?

10,606

I see you retrieved JSON documents from Azure CosmosDB and convert them to PySpark DataFrame, but the nested JSON document or array could not be transformed as a JSON object in a DataFrame column as you expected, because there is not a JSON type defined in pyspark.sql.types module, as below.

enter image description here

I searched a document PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame which be a suitable solution for your current case, even the same as you want, while I was trying to solve it.

The document above shows how to use ArrayType, StructType, StructField and other base PySpark datatypes to convert a JSON string in a column to a combined datatype which can be processed easier in PySpark via define the column schema and an UDF.

Here is the summary of sample code. Hope it helps.

source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]

JSON is read into a data frame through sqlContext. The output is:

+------+--------------------+

|attr_1|              attr_2|

+------+--------------------+

|     1|[{"a":1,"b":1},{"...|

|     2|[{"a":3,"b":3},{"...|

+------+--------------------+


root
  |-- attr_1: long (nullable = true)
  |-- attr_2: string (nullable = true)

Then, to convert the attr_2 column via define column schema and UDF.

# Function to convert JSON array string to a list
import json

def parse_json(array_str):
    json_obj = json.loads(array_str)
    for item in json_obj:
        yield (item["a"], item["b"])

# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField

json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))

# Define udf
from pyspark.sql.functions import udf

udf_parse_json = udf(lambda str: parse_json(str), json_schema)

# Generate a new data frame with the expected schema

df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()

The output is as the following:

+------+--------------+

|attr_1|        attr_2|

+------+--------------+

|     1|[[1,1], [2,2]]|

|     2|[[3,3], [4,4]]|

+------+--------------+


root
  |-- attr_1: long (nullable = true)
  |-- attr_2: array (nullable = true)
  |    |-- element: struct (containsNull = true)
  |    |    |-- a: integer (nullable = false)
  |    |    |-- b: integer (nullable = false)
Share:
10,606
Jon
Author by

Jon

I’m an Senior Consultant in Data & AI, at a one of the leading Microsoft Partners. I work on the Azure Stack of Data, Analytics & AI tools, Azure Services, most of what I do is around creating Data & Analytics and BI solutions for clients of which there is quite a range, from little 5 person companies to global enterprises. I have worked with: Microsoft SQL Server 2000 to 2019 SSIS, SSRS, SSAS Multidimensional & Tabular (DAX, MDX) Power BI Pro/Premium/Embedded Azure SQL Data Warehouse Azure Data Factory Azure SQL Database Azure Analysis Services Azure Databricks Azure CosmosDB Azure Data Lake Storage Azure Blob Storage Azure Functions Google Cloud Big Query Spanner Functions Firebase Other Stuff Python 3+ Teradata IBM DB2 Crystal Reports

Updated on June 27, 2022

Comments

  • Jon
    Jon almost 2 years

    I'm having some issues with reading items from Cosmos DB in databricks, it seems to read the JSON as a string value, and having some issues getting the data out of it to columns.

    I have a column called ProductRanges with the following values in a row:

    [   {
            "name": "Red",
            "min": 0,
            "max": 99,
            "value": "Order More"
        },
        {
            "name": "Amber",
            "min": 100,
            "max": 499,
            "value": "Stock OK"
        },
        {
            "name": "Green",
            "min": 500,
            "max": 1000000,
            "value": "Overstocked"
        }
    ]
    

    In Cosmos DB the JSON document is valid, out when importing the data the datatype in the dataframe is a string, not a JSON object/struct as I would expect.

    I would like the be able to get count the number of times "name" comes up and iterate through them the get the min, max and value items, as the number of ranges that we can have can be more than 3. I've been though a few post on stackoverflow and other places but stuck on the formatting. I've tried to use the explode and read the schema based in the column values, but it does say 'in vaild document', think it may be due to Pyspark needing {} at the start and the end, but even concatenating that in the SQL query from cosmos db still ends up as the datatype of string.

    Any pointers would be appreciated