Flatten Nested Spark Dataframe

32,253

Solution 1

This issue might be a bit old, but for anyone out there still looking for a solution you can flatten complex data types inline using select *:

first let's create the nested dataframe:

from pyspark.sql import HiveContext
hc = HiveContext(sc)
nested_df = hc.read.json(sc.parallelize(["""
{
  "field1": 1, 
  "field2": 2, 
  "nested_array":{
     "nested_field1": 3,
     "nested_field2": 4
  }
}
"""]))

now to flatten it:

flat_df = nested_df.select("field1", "field2", "nested_array.*")

You'll find useful examples here: https://docs.databricks.com/delta/data-transformation/complex-types.html

If you have too many nested arrays, you can use:

flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(*flat_cols, *[c + ".*" for c in nested_cols])

Solution 2

This flattens nested df that has both struct types and array types. Typically helps when reading data in through Json. Improved on this https://stackoverflow.com/a/56533459/7131019

from pyspark.sql.types import *
from pyspark.sql import functions as f

def flatten_structs(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        
        parents, df = stack.pop()
        
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
        
        flat_cols = [
            f.col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]
        
        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))
        
    return nested_df.select(columns)

def flatten_array_struct_df(df):
    
    array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    
    while len(array_cols) > 0:
        
        for array_col in array_cols:
            
            cols_to_select = [x for x in df.columns if x != array_col ]
            
            df = df.withColumn(array_col, f.explode(f.col(array_col)))
            
        df = flatten_structs(df)
        
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    return df

flat_df = flatten_array_struct_df(df)

**

Solution 3

I've developed a recursively approach to flatten any nested DataFrame.

The implementation is on the AWS Data Wrangler code base on GitHub.

P.S. The Spark support was deprecated in the package, but the code base stills useful.

Solution 4

Here's my final approach:

1) Map the rows in the dataframe to an rdd of dict. Find suitable python code online for flattening dict.

flat_rdd = nested_df.map(lambda x : flatten(x))

where

def flatten(x):
  x_dict = x.asDict()
  ...some flattening code...
  return x_dict

2) Convert the RDD[dict] back to a dataframe

flat_df = sqlContext.createDataFrame(flat_rdd)
Share:
32,253
John
Author by

John

Updated on January 01, 2021

Comments

  • John
    John over 3 years

    Is there a way to flatten an arbitrarily nested Spark Dataframe? Most of the work I'm seeing is written for specific schema, and I'd like to be able to generically flatten a Dataframe with different nested types (e.g. StructType, ArrayType, MapType, etc).

    Say I have a schema like:

    StructType(List(StructField(field1,...), StructField(field2,...), ArrayType(StructType(List(StructField(nested_field1,...), StructField(nested_field2,...)),nested_array,...)))
    

    Looking to adapt this into a flat table with a structure like:

    field1
    field2
    nested_array.nested_field1
    nested_array.nested_field2
    

    FYI, looking for suggestions for Pyspark, but other flavors of Spark are also appreciated.

  • Saikat
    Saikat almost 4 years
    Hi @MaFF, Your solution is really helpful. I have a query suppose in the example you provided if nested_array is array<struct<"nested_field1":string,""nested_field2":string>‌​> then how can i have nested_field1 and nested_field2 in separate columns. I can have multiple structs with same key fields and different values. Couldn't solve it. It would be really helpful if you can provide some insight
  • Mohammad Rijwan
    Mohammad Rijwan over 3 years
    Page not found!!
  • Sip
    Sip almost 2 years
    Is there a possibility to do this recursively, where structs are even more nested? Any tips?