Spark dataframe explode function
First I will note, that I can not explain why your explode()
turns into Row(employee: Seq[Row])
as I don't know the schema of your DataFrame. I have to assume it has to do with the structure of your data.
Not knowing you original data, I have created a small data set to work from
scala> val df = sc.parallelize( Array( (1, "dsfds dsf dasf dsf dsf d"), (2, "2344 2353 24 23432 234"))).toDF("id", "text")
df: org.apache.spark.sql.DataFrame = [id: int, text: string]
If I now map over it, you can se that it returns rows containing data of type Any.
scala> df.map {case row: Row => (row(0), row(1)) }
res21: org.apache.spark.rdd.RDD[(Any, Any)] = MapPartitionsRDD[17] at map at <console>:33
You have basically lost type information, which is why you need to explicitly specify the type when you want to use the data in the row
scala> df.map {case row: Row => (row(0).asInstanceOf[Int], row(1).asInstanceOf[String]) }
res22: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[18] at map at <console>:33
So, in order to explode it, I have to do the following
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.Row
df.explode(col("id"), col("text")) {case row: Row =>
val id = row(0).asInstanceOf[Int]
val words = row(1).asInstanceOf[String].split(" ")
words.map(word => (id, word))
}
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.Row
res30: org.apache.spark.sql.DataFrame = [id: int, text: string, _1: int, _2: string]
scala> res30 show
+---+--------------------+---+-----+
| id| text| _1| _2|
+---+--------------------+---+-----+
| 1|dsfds dsf dasf ds...| 1|dsfds|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| dasf|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| d|
| 2|2344 2353 24 2343...| 2| 2344|
| 2|2344 2353 24 2343...| 2| 2353|
| 2|2344 2353 24 2343...| 2| 24|
| 2|2344 2353 24 2343...| 2|23432|
| 2|2344 2353 24 2343...| 2| 234|
+---+--------------------+---+-----+
If you want named columns, you can define a case class to hold you exploded data
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.Row
case class ExplodedData(word: String)
df.explode(col("id"), col("text")) {case row: Row =>
val words = row(1).asInstanceOf[String].split(" ")
words.map(word => ExplodedData(word))
}
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.Row
defined class ExplodedData
res35: org.apache.spark.sql.DataFrame = [id: int, text: string, word: string]
scala> res35.select("id","word").show
+---+-----+
| id| word|
+---+-----+
| 1|dsfds|
| 1| dsf|
| 1| dasf|
| 1| dsf|
| 1| dsf|
| 1| d|
| 2| 2344|
| 2| 2353|
| 2| 24|
| 2|23432|
| 2| 234|
+---+-----+
Hope this brings some clearity.
Ramesh
Updated on June 28, 2022Comments
-
Ramesh almost 2 years
Can anyone please explain why case
Row
,Seq[Row]
are used after the explode of adataframe
field which has collection of elements. And also can you please explain me the reason why asInstanceOf is required to get the values from the exploded field?Here is the syntax:
val explodedDepartmentWithEmployeesDF = departmentWithEmployeesDF.explode(departmentWithEmployeesDF("employees")) { case Row(employee: Seq[Row]) => employee.map(employee => Employee(employee(0).asInstanceOf[String], employee(1).asInstanceOf[String], employee(2).asInstanceOf[String]) ) }