Dropping a nested column from Spark DataFrame
Solution 1
It is just a programming exercise but you can try something like this:
import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.{functions => f}
import scala.util.Try
case class DFWithDropFrom(df: DataFrame) {
def getSourceField(source: String): Try[StructField] = {
Try(df.schema.fields.filter(_.name == source).head)
}
def getType(sourceField: StructField): Try[StructType] = {
Try(sourceField.dataType.asInstanceOf[StructType])
}
def genOutputCol(names: Array[String], source: String): Column = {
f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)
}
def dropFrom(source: String, toDrop: Array[String]): DataFrame = {
getSourceField(source)
.flatMap(getType)
.map(_.fieldNames.diff(toDrop))
.map(genOutputCol(_, source))
.map(df.withColumn(source, _))
.getOrElse(df)
}
}
Example usage:
scala> case class features(feat1: String, feat2: String, feat3: String)
defined class features
scala> case class record(label: String, features: features)
defined class record
scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>]
scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show
+-------+--------+
| label|features|
+-------+--------+
|a_label| [f2,f3]|
+-------+--------+
scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show
+-------+----------+
| label| features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+
scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show
+-------+----------+
| label| features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+
Add an implicit conversion and you're good to go.
Solution 2
This version allows you to remove nested columns at any level:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, DataType}
/**
* Various Spark utilities and extensions of DataFrame
*/
object DataFrameUtils {
private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else {
colType match {
case colType: StructType =>
if (dropColName.startsWith(s"${fullColName}.")) {
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
} else {
Some(col)
}
case other => Some(col)
}
}
}
protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
/**
* Extended version of DataFrame that allows to operate on nested fields
*/
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Drops nested field from DataFrame
*
* @param colName Dot-separated nested field name
*/
def dropNestedColumn(colName: String): DataFrame = {
DataFrameUtils.dropColumn(df, colName)
}
}
}
Usage:
import DataFrameUtils._
df.dropNestedColumn("a.b.c.d")
Solution 3
Expanding on spektom answer. With support for array types:
object DataFrameUtils {
private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
Some(struct(innerType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
}
case other => Some(col)
}
} else {
Some(col)
}
}
protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
/**
* Extended version of DataFrame that allows to operate on nested fields
*/
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Drops nested field from DataFrame
*
* @param colName Dot-separated nested field name
*/
def dropNestedColumn(colName: String): DataFrame = {
DataFrameUtils.dropColumn(df, colName)
}
}
}
Solution 4
I will expand upon mmendez.semantic's answer here, and accounting for the issues described in the sub-thread.
def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
// we are potentially dropping a column from within a struct, that is itself inside an array
// Spark has some very strange behavior in this case, which they insist is not a bug
// see https://issues.apache.org/jira/browse/SPARK-31779 and associated comments
// and also the thread here: https://stackoverflow.com/a/39943812/375670
// this is a workaround for that behavior
// first, get all struct fields
val innerFields = innerType.fields
// next, create a new type for all the struct fields EXCEPT the column that is to be dropped
// we will need this later
val preserveNamesStruct = ArrayType(StructType(
innerFields.filterNot(f => s"$fullColName.${f.name}".equals(dropColName))
))
// next, apply dropSubColumn recursively to build up the new values after dropping the column
val filteredInnerFields = innerFields.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
}
)
// finally, use arrays_zip to unwrap the arrays that were introduced by building up the new. filtered
// struct in this way (see comments in SPARK-31779), and then cast to the StructType we created earlier
// to get the original names back
Some(arrays_zip(filteredInnerFields:_*).cast(preserveNamesStruct))
}
case _ => Some(col)
}
} else {
Some(col)
}
}
def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
}).foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
Usage in spark-shell
:
// if defining the functions above in your spark-shell session, you first need imports
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// now you can paste the function definitions
// create a deeply nested and complex JSON structure
val jsonData = """{
"foo": "bar",
"top": {
"child1": 5,
"child2": [
{
"child2First": "one",
"child2Second": 2,
"child2Third": -19.51
}
],
"child3": ["foo", "bar", "baz"],
"child4": [
{
"child2First": "two",
"child2Second": 3,
"child2Third": 16.78
}
]
}
}"""
// read it into a DataFrame
val df = spark.read.option("multiline", "true").json(Seq(jsonData).toDS())
// remove a sub-column
val modifiedDf = dropColumn(df, "top.child2.child2First")
modifiedDf.printSchema
root
|-- foo: string (nullable = true)
|-- top: struct (nullable = false)
| |-- child1: long (nullable = true)
| |-- child2: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child2Second: long (nullable = true)
| | | |-- child2Third: double (nullable = true)
| |-- child3: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- child4: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child2First: string (nullable = true)
| | | |-- child2Second: long (nullable = true)
| | | |-- child2Third: double (nullable = true)
modifiedDf.show(truncate=false)
+---+------------------------------------------------------+
|foo|top |
+---+------------------------------------------------------+
|bar|[5, [[2, -19.51]], [foo, bar, baz], [[two, 3, 16.78]]]|
+---+------------------------------------------------------+
Solution 5
Another (PySpark) way would be to drop the features.feat1
column by creating features
again:
from pyspark.sql.functions import col, arrays_zip
display(df
.withColumn("features", arrays_zip("features.feat2", "features.feat3"))
.withColumn("features", col("features").cast(schema))
)
Where schema
is the new schema (excluding features.feat1
).
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType(
[
StructField('feat2', StringType(), True),
StructField('feat3', StringType(), True),
]
)
Nikhil J Joshi
Updated on January 27, 2022Comments
-
Nikhil J Joshi over 2 years
I have a
DataFrame
with the schemaroot |-- label: string (nullable = true) |-- features: struct (nullable = true) | |-- feat1: string (nullable = true) | |-- feat2: string (nullable = true) | |-- feat3: string (nullable = true)
While, I am able to filter the data frame using
val data = rawData .filter( !(rawData("features.feat1") <=> "100") )
I am unable to drop the columns using
val data = rawData .drop("features.feat1")
Is it something that I am doing wrong here? I also tried (unsuccessfully) doing
drop(rawData("features.feat1"))
, though it does not make much sense to do so.Thanks in advance,
Nikhil
-
alexP_Keaton over 7 yearsThanks much! Any chance you've updated this to drop a field from a struct in an array under an array? Been hacking for over a day now, close but can't get it. i.e. parent:array<element:struct<child:array<element:struct<keep:int, drop:int>>>>
-
V. Samma about 7 years@alexP_Keaton Hey, did you get a solution for dropping a column inside an array?
-
Michel Lemay almost 7 yearsI would like to add that this method does not preserve 'nullable' property of the modified parent struct. In this example,
features
will becomestruct (nullable = false)
-
Michel Lemay almost 7 yearsOne way I found to fix this issue is to pass
f.nullable
to dropSubColumn and use this constructwhen(col.isNotNull, newCol)
on the result ofstruct(... :_*)
-
Panagiotis Drakatos over 6 yearsadd a simple example of how to call it and i will upvote you
-
Lior Chaga over 6 yearsAdded usage example per @xXxpRoGrAmmErxXx request
-
Jeff Evans over 4 yearsThe call to
struct
withincase colType: ArrayType
needs to be wrapped inarray
, or else you "lose" the array wrapping a parent of the column removed. In addition, the remaining items in the lowest struct (i.e. the one with a member that gets removed) get converted toarray
for some reason, which I'm still debugging. -
Jeff Evans over 4 yearsYou actually don't need the
df.drop(colName)
bit here. It doesn't work anyway (or else we could simply drop nested columns using the API directly). Also, thewithColumn
will replace an existing column name with a new definition if given. -
Doru Chiulan over 4 years@JeffEvans did you find the reason why that happens (field getting converted to array) ? I debugged all day long and I can't understand what Spark's doing there. Might be a bug?
-
Jeff Evans over 4 yearsNot sure. It's possible the code we're working off of isn't correct, but I can't understand the innards of the Spark library code well enough yet to say for certain.
-
prakharjain about 4 yearswhat is sf in _drop_nested_field?
-
M.Vanderlee about 4 years@prakharjain
import pyspark.sql.functions as sf
I've updated the example. -
smishra about 4 yearsAlthough I have upvoated this answer because it works when all nested rows have uniform schema-- otherwise it does not work - it just returns the original DataFrame.
-
smishra about 4 yearsAs it appears the problem was with getOrElse part of the statement, if there is any exception thrown it does not get printed and 'Else' part takes over and it returns original DataFrame. For example, in my case, case insensitivity was the issue - there were two columns with same name but in different cases.
-
Mahnaz almost 4 yearsas @JeffEvans mentioned there is a bug here that converts the remaining items into array. to avoid it you should this:
case Some(x) => Some(x.getItem(0).alias(f.name))
instead ofcase Some(x) => Some(x.alias(f.name))
in ArrayType case -
Jeff Evans almost 4 yearsDoing
.getItem(0)
before thealias
(inside theArrayType
->StructType
case) does indeed "work". However, this really shouldn't be needed. I believe there is a bug in Spark itself, so I have opened a Jira for that: issues.apache.org/jira/browse/SPARK-31779 -
Jeff Evans almost 4 yearsIt seems that the Spark maintainers do not agree this is a bug. Nonetheless, given their comments on the Jira I opened, I can confirm a workaround. Will paste as a separate answer. – Jeff Evans 6 hours ago Delete
-
Sampat Kumar almost 3 yearsI see we are using "arrays_zip" in the def call "dropSubColumn" but as of spark 2.3 this will not work , is there an alternative to do this. I tried to create an UDF as below but no luck in using the same. val zipped = udf((s: Seq[String],t: Seq[String]) => s zip t)
-
Jeff Evans almost 3 yearsAs I recall, this was working in 2.4. But I no longer have access to the underlying project code so I'm not sure. Should be easy to use spark-shell to test.
-
Varun Taliyan over 2 yearsDoes anyone have a similar solution for adding an optional (nullable) column in a deeply nested struct field.
-
mixermt about 2 yearsNot working when there is a need to drop the field in struct wrapped by array