How to read a nested collection in Spark

18,512

Solution 1

There is no magic in the case of nested collection. Spark will handle the same way a RDD[(String, String)] and a RDD[(String, Seq[String])].

Reading such nested collection from Parquet files can be tricky, though.

Let's take an example from the spark-shell (1.3.1):

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

Write the parquet file:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

Read the parquet file:

scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

The important part is row.getAs[Seq[Row]](1). The internal representation of a nested sequence of struct is ArrayBuffer[Row], you could use any super-type of it instead of Seq[Row]. The 1 is the column index in the outer row. I used the method getAs here but there are alternatives in the latest versions of Spark. See the source code of the Row trait.

Now that you have a RDD[Outer], you can apply any wanted transformation or action.

// Filter the outers
outers.filter(_.inners.nonEmpty)

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

Note that we used the spark-SQL library only to read the parquet file. You could for example select only the wanted columns directly on the DataFrame, before mapping it to a RDD.

dataFrame.select('col1, 'col2).map { row => ... }

Solution 2

I'll give a Python-based answer since that's what I'm using. I think Scala has something similar.

The explode function was added in Spark 1.4.0 to handle nested arrays in DataFrames, according to the Python API docs.

Create a test dataframe:

from pyspark.sql import Row

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()

## +-+--------------------+
## |a|             intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+

Use explode to flatten the list column:

from pyspark.sql.functions import explode

df.select(df.a, explode(df.intlist)).show()

## +-+---+
## |a|_c0|
## +-+---+
## |1|  1|
## |1|  2|
## |1|  3|
## |2|  4|
## |2|  5|
## |2|  6|
## +-+---+

Solution 3

Another approach would be using pattern matching like this:

val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
  case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
    case List(a:String, b: String) => (a, b)
  }).toList
})

You can pattern match directly on Row but it is likely to fail for a few reasons.

Solution 4

Above answers are all great answers and tackle this question from different sides; Spark SQL is also quite useful way to access nested data.

Here's example how to use explode() in SQL directly to query nested collection.

SELECT hholdid, tsp.person_seq_no 
FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
        FROM disc_mrt.unified_fact uf
     )

tsp_ids is a nested of structs, which has many attributes, including person_seq_no which I'm selecting in the outer query above.

Above was tested in Spark 2.0. I did a small test and it doesn't work in Spark 1.6. This question was asked when Spark 2 wasn't around, so this answer adds nicely to the list of available options to deal with nested structures.

Have a look also on following JIRAs for Hive-compatible way to query nested data using LATERAL VIEW OUTER syntax, since Spark 2.2 also supports OUTER explode (e.g. when a nested collection is empty, but you still want to have attributes from a parent record):

Noticable not resolved JIRA on explode() for SQL access:

Share:
18,512
Tagar
Author by

Tagar

Chief Distributed Data Distiller

Updated on June 05, 2022

Comments

  • Tagar
    Tagar about 2 years

    I have a parquet table with one of the columns being

    , array<struct<col1,col2,..colN>>

    Can run queries against this table in Hive using LATERAL VIEW syntax.

    How to read this table into an RDD, and more importantly how to filter, map etc this nested collection in Spark?

    Could not find any references to this in Spark documentation. Thanks in advance for any information!

    ps. I felt might be helpful to give some stats on the table. Number of columns in main table ~600. Number of rows ~200m. Number of "columns" in nested collection ~10. Avg number of records in nested collection ~35.

  • Tagar
    Tagar about 9 years
    Thank you Lomig for detailed response. I've marked it as a correct response. Although we are not yet at Spark 1.3, planning to upgrade this month. Is it possible to do without data frame API in Spark 1.2? Could you please let me know how getAs[Seq[Row]](1) works? Index [1] is position of the column that contains nested array, is this right?
  • Lomig Mégard
    Lomig Mégard about 9 years
    See my edit. For Spark 1.2, you can use the exact same code for the transformation from Row to your case class. Please refer to the official documentation for the syntax to read a parquet file in older versions, it is very close.
  • Tagar
    Tagar about 9 years
    Got it. Thanks a lot. github.com/apache/spark/blob/master/sql/catalyst/src/main/sc‌​ala/… GetSeq[Row](1) would do as well?
  • Lomig Mégard
    Lomig Mégard about 9 years
    You're welcome. Yes, getSeq[Row] will be an alternative. I'm not sure this method was available in Spark 1.2, though. I let you check.
  • Tagar
    Tagar about 9 years
    I saw a post today at [email protected] list that Spark SQL supports LATERAL VIEW syntax directly. Will try both ways once we're on Spark 1.3; (waiting for CDH 5.4.1 to get released before we can upgrade)
  • Tagar
    Tagar almost 9 years
    Thanks dnlbrky. It looks simpler to read than Scala. I will definitely try your python example.. We probably wouldn't have Spark 1.4 though until sometime end of this year once Cloudera releases CDH 5.5 :-) Hope to have Spark 1.5 by that time.
  • sumitya
    sumitya over 4 years
    explode is costly operation, can you think of any other way?