Spark dataframe explode function

15,500

First I will note, that I can not explain why your explode() turns into Row(employee: Seq[Row]) as I don't know the schema of your DataFrame. I have to assume it has to do with the structure of your data.

Not knowing you original data, I have created a small data set to work from

scala> val df = sc.parallelize( Array( (1, "dsfds dsf dasf dsf dsf d"), (2, "2344 2353 24 23432 234"))).toDF("id", "text")
df: org.apache.spark.sql.DataFrame = [id: int, text: string]

If I now map over it, you can se that it returns rows containing data of type Any.

scala> df.map {case row: Row => (row(0), row(1)) }
res21: org.apache.spark.rdd.RDD[(Any, Any)] = MapPartitionsRDD[17] at map at <console>:33

You have basically lost type information, which is why you need to explicitly specify the type when you want to use the data in the row

scala> df.map {case row: Row => (row(0).asInstanceOf[Int], row(1).asInstanceOf[String]) }
res22: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[18] at map at <console>:33

So, in order to explode it, I have to do the following

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.Row
df.explode(col("id"), col("text")) {case row: Row =>
    val id = row(0).asInstanceOf[Int]
    val words = row(1).asInstanceOf[String].split(" ")
    words.map(word => (id, word))
}

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.Row
res30: org.apache.spark.sql.DataFrame = [id: int, text: string, _1: int, _2: string]

scala> res30 show
+---+--------------------+---+-----+
| id|                text| _1|   _2|
+---+--------------------+---+-----+
|  1|dsfds dsf dasf ds...|  1|dsfds|
|  1|dsfds dsf dasf ds...|  1|  dsf|
|  1|dsfds dsf dasf ds...|  1| dasf|
|  1|dsfds dsf dasf ds...|  1|  dsf|
|  1|dsfds dsf dasf ds...|  1|  dsf|
|  1|dsfds dsf dasf ds...|  1|    d|
|  2|2344 2353 24 2343...|  2| 2344|
|  2|2344 2353 24 2343...|  2| 2353|
|  2|2344 2353 24 2343...|  2|   24|
|  2|2344 2353 24 2343...|  2|23432|
|  2|2344 2353 24 2343...|  2|  234|
+---+--------------------+---+-----+

If you want named columns, you can define a case class to hold you exploded data

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.Row
case class ExplodedData(word: String)
df.explode(col("id"), col("text")) {case row: Row =>
    val words = row(1).asInstanceOf[String].split(" ")
    words.map(word => ExplodedData(word))
}

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.Row
defined class ExplodedData
res35: org.apache.spark.sql.DataFrame = [id: int, text: string, word: string]

scala> res35.select("id","word").show
+---+-----+
| id| word|
+---+-----+
|  1|dsfds|
|  1|  dsf|
|  1| dasf|
|  1|  dsf|
|  1|  dsf|
|  1|    d|
|  2| 2344|
|  2| 2353|
|  2|   24|
|  2|23432|
|  2|  234|
+---+-----+

Hope this brings some clearity.

Share:
15,500
Ramesh
Author by

Ramesh

Updated on June 28, 2022

Comments

  • Ramesh
    Ramesh almost 2 years

    Can anyone please explain why case Row, Seq[Row] are used after the explode of a dataframe field which has collection of elements. And also can you please explain me the reason why asInstanceOf is required to get the values from the exploded field?

    Here is the syntax:

    val explodedDepartmentWithEmployeesDF = departmentWithEmployeesDF.explode(departmentWithEmployeesDF("employees")) {     
                              case Row(employee: Seq[Row]) => 
                              employee.map(employee =>
                              Employee(employee(0).asInstanceOf[String], 
                              employee(1).asInstanceOf[String], employee(2).asInstanceOf[String]) ) }