Dropping a nested column from Spark DataFrame

23,511

Solution 1

It is just a programming exercise but you can try something like this:

import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.{functions => f}
import scala.util.Try

case class DFWithDropFrom(df: DataFrame) {
  def getSourceField(source: String): Try[StructField] = {
    Try(df.schema.fields.filter(_.name == source).head)
  }

  def getType(sourceField: StructField): Try[StructType] = {
    Try(sourceField.dataType.asInstanceOf[StructType])
  }

  def genOutputCol(names: Array[String], source: String): Column = {
    f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)
  }

  def dropFrom(source: String, toDrop: Array[String]): DataFrame = {
    getSourceField(source)
      .flatMap(getType)
      .map(_.fieldNames.diff(toDrop))
      .map(genOutputCol(_, source))
      .map(df.withColumn(source, _))
      .getOrElse(df)
  }
}

Example usage:

scala> case class features(feat1: String, feat2: String, feat3: String)
defined class features

scala> case class record(label: String, features: features)
defined class record

scala> val df = sc.parallelize(Seq(record("a_label",  features("f1", "f2", "f3")))).toDF
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>]

scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show
+-------+--------+
|  label|features|
+-------+--------+
|a_label| [f2,f3]|
+-------+--------+


scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show
+-------+----------+
|  label|  features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+


scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show
+-------+----------+
|  label|  features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+

Add an implicit conversion and you're good to go.

Solution 2

This version allows you to remove nested columns at any level:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, DataType}

/**
  * Various Spark utilities and extensions of DataFrame
  */
object DataFrameUtils {

  private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
      None
    } else {
      colType match {
        case colType: StructType =>
          if (dropColName.startsWith(s"${fullColName}.")) {
            Some(struct(
              colType.fields
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                  })
                : _*))
          } else {
            Some(col)
          }
        case other => Some(col)
      }
    }
  }

  protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
    df.schema.fields
      .flatMap(f => {
        if (colName.startsWith(s"${f.name}.")) {
          dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
            case Some(x) => Some((f.name, x))
            case None => None
          }
        } else {
          None
        }
      })
      .foldLeft(df.drop(colName)) {
        case (df, (colName, column)) => df.withColumn(colName, column)
      }
  }

  /**
    * Extended version of DataFrame that allows to operate on nested fields
    */
  implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
    /**
      * Drops nested field from DataFrame
      *
      * @param colName Dot-separated nested field name
      */
    def dropNestedColumn(colName: String): DataFrame = {
      DataFrameUtils.dropColumn(df, colName)
    }
  }
}

Usage:

import DataFrameUtils._
df.dropNestedColumn("a.b.c.d")

Solution 3

Expanding on spektom answer. With support for array types:

object DataFrameUtils {

  private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
      None
    } else if (dropColName.startsWith(s"$fullColName.")) {
      colType match {
        case colType: StructType =>
          Some(struct(
            colType.fields
              .flatMap(f =>
                dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                  case Some(x) => Some(x.alias(f.name))
                  case None => None
                })
              : _*))
        case colType: ArrayType =>
          colType.elementType match {
            case innerType: StructType =>
              Some(struct(innerType.fields
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                  })
                : _*))
          }

        case other => Some(col)
      }
    } else {
      Some(col)
    }
  }

  protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
    df.schema.fields
      .flatMap(f => {
        if (colName.startsWith(s"${f.name}.")) {
          dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
            case Some(x) => Some((f.name, x))
            case None => None
          }
        } else {
          None
        }
      })
      .foldLeft(df.drop(colName)) {
        case (df, (colName, column)) => df.withColumn(colName, column)
      }
  }

  /**
    * Extended version of DataFrame that allows to operate on nested fields
    */
  implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
    /**
      * Drops nested field from DataFrame
      *
      * @param colName Dot-separated nested field name
      */
    def dropNestedColumn(colName: String): DataFrame = {
      DataFrameUtils.dropColumn(df, colName)
    }
  }

}

Solution 4

I will expand upon mmendez.semantic's answer here, and accounting for the issues described in the sub-thread.

  def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
      None
    } else if (dropColName.startsWith(s"$fullColName.")) {
      colType match {
        case colType: StructType =>
          Some(struct(
            colType.fields
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                  })
                : _*))
        case colType: ArrayType =>
          colType.elementType match {
            case innerType: StructType =>
              // we are potentially dropping a column from within a struct, that is itself inside an array
              // Spark has some very strange behavior in this case, which they insist is not a bug
              // see https://issues.apache.org/jira/browse/SPARK-31779 and associated comments
              // and also the thread here: https://stackoverflow.com/a/39943812/375670
              // this is a workaround for that behavior

              // first, get all struct fields
              val innerFields = innerType.fields
              // next, create a new type for all the struct fields EXCEPT the column that is to be dropped
              // we will need this later
              val preserveNamesStruct = ArrayType(StructType(
                innerFields.filterNot(f => s"$fullColName.${f.name}".equals(dropColName))
              ))
              // next, apply dropSubColumn recursively to build up the new values after dropping the column
              val filteredInnerFields = innerFields.flatMap(f =>
                dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                }
              )
              // finally, use arrays_zip to unwrap the arrays that were introduced by building up the new. filtered
              // struct in this way (see comments in SPARK-31779), and then cast to the StructType we created earlier
              // to get the original names back
              Some(arrays_zip(filteredInnerFields:_*).cast(preserveNamesStruct))
          }

        case _ => Some(col)
      }
    } else {
      Some(col)
    }
  }

  def dropColumn(df: DataFrame, colName: String): DataFrame = {
    df.schema.fields.flatMap(f => {
      if (colName.startsWith(s"${f.name}.")) {
        dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
          case Some(x) => Some((f.name, x))
          case None => None
        }
      } else {
        None
      }
    }).foldLeft(df.drop(colName)) {
      case (df, (colName, column)) => df.withColumn(colName, column)
    }
  }

Usage in spark-shell:

// if defining the functions above in your spark-shell session, you first need imports
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// now you can paste the function definitions

// create a deeply nested and complex JSON structure    
val jsonData = """{
      "foo": "bar",
      "top": {
        "child1": 5,
        "child2": [
          {
            "child2First": "one",
            "child2Second": 2,
            "child2Third": -19.51
          }
        ],
        "child3": ["foo", "bar", "baz"],
        "child4": [
          {
            "child2First": "two",
            "child2Second": 3,
            "child2Third": 16.78
          }
        ]
      }
    }"""

// read it into a DataFrame
val df = spark.read.option("multiline", "true").json(Seq(jsonData).toDS())

// remove a sub-column
val modifiedDf = dropColumn(df, "top.child2.child2First")

modifiedDf.printSchema
root
 |-- foo: string (nullable = true)
 |-- top: struct (nullable = false)
 |    |-- child1: long (nullable = true)
 |    |-- child2: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- child2Second: long (nullable = true)
 |    |    |    |-- child2Third: double (nullable = true)
 |    |-- child3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- child4: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- child2First: string (nullable = true)
 |    |    |    |-- child2Second: long (nullable = true)
 |    |    |    |-- child2Third: double (nullable = true)


modifiedDf.show(truncate=false)
+---+------------------------------------------------------+
|foo|top                                                   |
+---+------------------------------------------------------+
|bar|[5, [[2, -19.51]], [foo, bar, baz], [[two, 3, 16.78]]]|
+---+------------------------------------------------------+

Solution 5

Another (PySpark) way would be to drop the features.feat1 column by creating features again:

from pyspark.sql.functions import col, arrays_zip

display(df
        .withColumn("features", arrays_zip("features.feat2", "features.feat3"))
        .withColumn("features", col("features").cast(schema))
)

Where schema is the new schema (excluding features.feat1).

from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
      StructField('feat2', StringType(), True), 
      StructField('feat3', StringType(), True), 
    ]
  )
Share:
23,511
Nikhil J Joshi
Author by

Nikhil J Joshi

Updated on January 27, 2022

Comments

  • Nikhil J Joshi
    Nikhil J Joshi over 2 years

    I have a DataFrame with the schema

    root
     |-- label: string (nullable = true)
     |-- features: struct (nullable = true)
     |    |-- feat1: string (nullable = true)
     |    |-- feat2: string (nullable = true)
     |    |-- feat3: string (nullable = true)
    

    While, I am able to filter the data frame using

      val data = rawData
         .filter( !(rawData("features.feat1") <=> "100") )
    

    I am unable to drop the columns using

      val data = rawData
           .drop("features.feat1")
    

    Is it something that I am doing wrong here? I also tried (unsuccessfully) doing drop(rawData("features.feat1")), though it does not make much sense to do so.

    Thanks in advance,

    Nikhil

  • alexP_Keaton
    alexP_Keaton over 7 years
    Thanks much! Any chance you've updated this to drop a field from a struct in an array under an array? Been hacking for over a day now, close but can't get it. i.e. parent:array<element:struct<child:array<element:struct<keep:‌​int, drop:int>>>>
  • V. Samma
    V. Samma about 7 years
    @alexP_Keaton Hey, did you get a solution for dropping a column inside an array?
  • Michel Lemay
    Michel Lemay almost 7 years
    I would like to add that this method does not preserve 'nullable' property of the modified parent struct. In this example, features will become struct (nullable = false)
  • Michel Lemay
    Michel Lemay almost 7 years
    One way I found to fix this issue is to pass f.nullable to dropSubColumn and use this construct when(col.isNotNull, newCol) on the result of struct(... :_*)
  • Panagiotis Drakatos
    Panagiotis Drakatos over 6 years
    add a simple example of how to call it and i will upvote you
  • Lior Chaga
    Lior Chaga over 6 years
    Added usage example per @xXxpRoGrAmmErxXx request
  • Jeff Evans
    Jeff Evans over 4 years
    The call to struct within case colType: ArrayType needs to be wrapped in array, or else you "lose" the array wrapping a parent of the column removed. In addition, the remaining items in the lowest struct (i.e. the one with a member that gets removed) get converted to array for some reason, which I'm still debugging.
  • Jeff Evans
    Jeff Evans over 4 years
    You actually don't need the df.drop(colName) bit here. It doesn't work anyway (or else we could simply drop nested columns using the API directly). Also, the withColumn will replace an existing column name with a new definition if given.
  • Doru Chiulan
    Doru Chiulan over 4 years
    @JeffEvans did you find the reason why that happens (field getting converted to array) ? I debugged all day long and I can't understand what Spark's doing there. Might be a bug?
  • Jeff Evans
    Jeff Evans over 4 years
    Not sure. It's possible the code we're working off of isn't correct, but I can't understand the innards of the Spark library code well enough yet to say for certain.
  • prakharjain
    prakharjain about 4 years
    what is sf in _drop_nested_field?
  • M.Vanderlee
    M.Vanderlee about 4 years
    @prakharjain import pyspark.sql.functions as sf I've updated the example.
  • smishra
    smishra about 4 years
    Although I have upvoated this answer because it works when all nested rows have uniform schema-- otherwise it does not work - it just returns the original DataFrame.
  • smishra
    smishra about 4 years
    As it appears the problem was with getOrElse part of the statement, if there is any exception thrown it does not get printed and 'Else' part takes over and it returns original DataFrame. For example, in my case, case insensitivity was the issue - there were two columns with same name but in different cases.
  • Mahnaz
    Mahnaz almost 4 years
    as @JeffEvans mentioned there is a bug here that converts the remaining items into array. to avoid it you should this: case Some(x) => Some(x.getItem(0).alias(f.name)) instead of case Some(x) => Some(x.alias(f.name)) in ArrayType case
  • Jeff Evans
    Jeff Evans almost 4 years
    Doing .getItem(0) before the alias (inside the ArrayType -> StructType case) does indeed "work". However, this really shouldn't be needed. I believe there is a bug in Spark itself, so I have opened a Jira for that: issues.apache.org/jira/browse/SPARK-31779
  • Jeff Evans
    Jeff Evans almost 4 years
    It seems that the Spark maintainers do not agree this is a bug. Nonetheless, given their comments on the Jira I opened, I can confirm a workaround. Will paste as a separate answer. – Jeff Evans 6 hours ago Delete
  • Sampat Kumar
    Sampat Kumar almost 3 years
    I see we are using "arrays_zip" in the def call "dropSubColumn" but as of spark 2.3 this will not work , is there an alternative to do this. I tried to create an UDF as below but no luck in using the same. val zipped = udf((s: Seq[String],t: Seq[String]) => s zip t)
  • Jeff Evans
    Jeff Evans almost 3 years
    As I recall, this was working in 2.4. But I no longer have access to the underlying project code so I'm not sure. Should be easy to use spark-shell to test.
  • Varun Taliyan
    Varun Taliyan over 2 years
    Does anyone have a similar solution for adding an optional (nullable) column in a deeply nested struct field.
  • mixermt
    mixermt about 2 years
    Not working when there is a need to drop the field in struct wrapped by array