How to slice and sum elements of array column?

15,134

Solution 1

Spark 2.4.0

As of Spark 2.4, Spark SQL supports higher-order functions that are to manipulate complex data structures, including arrays.

The "modern" solution would be as follows:

scala> input.show(false)
+-------+-------+-------------------------+
|dept_id|dept_nm|emp_details              |
+-------+-------+-------------------------+
|10     |Finance|[100, 200, 300, 400, 500]|
|20     |IT     |[10, 20, 50, 100]        |
+-------+-------+-------------------------+

input.createOrReplaceTempView("mytable")

val sqlText = "select dept_id, dept_nm, aggregate(emp_details, 0, (acc, value) -> acc + value) as sum from mytable"
scala> sql(sqlText).show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+

You can find a good reading on higher-order functions in the following articles and video:

  1. Introducing New Built-in and Higher-Order Functions for Complex Data Types in Apache Spark 2.4
  2. Working with Nested Data Using Higher Order Functions in SQL on Databricks
  3. An Introduction to Higher Order Functions in Spark SQL with Herman van Hovell (Databricks)

Spark 2.3.2 and earlier

DISCLAIMER I would not recommend this approach (even though it got the most upvotes) because of the deserialization that Spark SQL does to execute Dataset.map. The query forces Spark to deserialize the data and load it onto JVM (from memory regions that are managed by Spark outside JVM). That will inevitably lead to more frequent GCs and hence make performance worse.

One solution would be to use Dataset solution where the combination of Spark SQL and Scala could show its power.

scala> val inventory = Seq(
     |   (10, "Finance", Seq(100, 200, 300, 400, 500)),
     |   (20, "IT", Seq(10, 20, 50, 100))).toDF("dept_id", "dept_nm", "emp_details")
inventory: org.apache.spark.sql.DataFrame = [dept_id: int, dept_nm: string ... 1 more field]

// I'm too lazy today for a case class
scala> inventory.as[(Long, String, Seq[Int])].
  map { case (deptId, deptName, details) => (deptId, deptName, details.sum) }.
  toDF("dept_id", "dept_nm", "sum").
  show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+

I'm leaving the slice part as an exercise as it's equally simple.

Solution 2

Since Spark 2.4 you can slice with the slice function:

import org.apache.spark.sql.functions.slice

val df = Seq(
  (10, "Finance", Seq(100, 200, 300, 400, 500)),
  (20, "IT", Seq(10, 20, 50, 100))
).toDF("dept_id", "dept_nm", "emp_details")

val dfSliced = df.withColumn(
   "emp_details_sliced",
   slice($"emp_details", 1, 3)
)

dfSliced.show(false)
+-------+-------+-------------------------+------------------+
|dept_id|dept_nm|emp_details              |emp_details_sliced|
+-------+-------+-------------------------+------------------+
|10     |Finance|[100, 200, 300, 400, 500]|[100, 200, 300]   |
|20     |IT     |[10, 20, 50, 100]        |[10, 20, 50]      |
+-------+-------+-------------------------+------------------+

and sum arrays with aggregate:

dfSliced.selectExpr(
  "*", 
  "aggregate(emp_details, 0, (x, y) -> x + y) as details_sum",  
  "aggregate(emp_details_sliced, 0, (x, y) -> x + y) as details_sliced_sum"
).show
+-------+-------+--------------------+------------------+-----------+------------------+
|dept_id|dept_nm|         emp_details|emp_details_sliced|details_sum|details_sliced_sum|
+-------+-------+--------------------+------------------+-----------+------------------+
|     10|Finance|[100, 200, 300, 4...|   [100, 200, 300]|       1500|               600|
|     20|     IT|   [10, 20, 50, 100]|      [10, 20, 50]|        180|                80|
+-------+-------+--------------------+------------------+-----------+------------------+

Solution 3

A possible approach it to use explode() on your Array column and consequently aggregate the output by unique key. For example:

import sqlContext.implicits._
import org.apache.spark.sql.functions._

(mytable
  .withColumn("emp_sum",
    explode($"emp_details"))
  .groupBy("dept_nm")
  .agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance|        1500|
|     IT|         180|
+-------+------------+

To select only specific values in your array, we can work with the answer from the linked question and apply it with a slight modification:

val slice = udf((array : Seq[Int], from : Int, to : Int) => array.slice(from,to))

(mytable
  .withColumn("slice", 
    slice($"emp_details", 
      lit(0), 
      lit(3)))
  .withColumn("emp_sum",
    explode($"slice"))
  .groupBy("dept_nm")
  .agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance|         600|
|     IT|          80|
+-------+------------+

Data:

val data = Seq((10, "Finance", Array(100,200,300,400,500)),
               (20, "IT", Array(10,20,50,100)))
val mytable = sc.parallelize(data).toDF("dept_id", "dept_nm","emp_details")

Solution 4

Here is an alternative to mtoto's answer without using a groupBy (I really don't know which one is fastest: UDF, mtoto solution or mine, comments welcome)

You would a performance impact on using a UDF, in general. There is an answer which you might want to read and this resource is a good read on UDF.

Now for your problem, you can avoid the use of a UDF. What I would use is a Column expression generated with Scala logic.

data:

val df = Seq((10, "Finance", Array(100,200,300,400,500)),
                  (20, "IT", Array(10,  20, 50,100)))
          .toDF("dept_id", "dept_nm","emp_details")

You need some trickery to be able to traverse a ArrayType, you can play a bit with the solution to discover various problems (see edit at the bottom for the slice part). Here is my proposal but you might find better. First you take the maximum length

val maxLength = df.select(size('emp_details).as("l")).groupBy().max("l").first.getInt(0)

Then you use it, testing when you have a shorter array

val sumArray = (1 until maxLength)
      .map(i => when(size('emp_details) > i,'emp_details(i)).otherwise(lit(0)))
      .reduce(_ + _)
      .as("sumArray")

val res = df
  .select('dept_id,'dept_nm,'emp_details,sumArray)

result:

+-------+-------+--------------------+--------+
|dept_id|dept_nm|         emp_details|sumArray|
+-------+-------+--------------------+--------+
|     10|Finance|[100, 200, 300, 4...|    1500|
|     20|     IT|   [10, 20, 50, 100]|     180|
+-------+-------+--------------------+--------+

I advise you to look at sumArray to understand what it is doing.

Edit: Of course I only read half of the question again... But if you want to changes the items on which to sum, you can see that it becomes obvious with this solution (i.e. you don't need a slice function), just change (0 until maxLength) with the range of index you need:

def sumArray(from: Int, max: Int) = (from until max)
      .map(i => when(size('emp_details) > i,'emp_details(i)).otherwise(lit(0)))
      .reduce(_ + _)
      .as("sumArray")
Share:
15,134

Related videos on Youtube

serious_black
Author by

serious_black

Updated on September 23, 2022

Comments

  • serious_black
    serious_black over 1 year

    I would like to sum (or perform other aggregate functions too) on the array column using SparkSQL.

    I have a table as

    +-------+-------+---------------------------------+
    |dept_id|dept_nm|                      emp_details|
    +-------+-------+---------------------------------+
    |     10|Finance|        [100, 200, 300, 400, 500]|
    |     20|     IT|                [10, 20, 50, 100]|
    +-------+-------+---------------------------------+
    

    I would like to sum the values of this emp_details column .

    Expected query:

    sqlContext.sql("select sum(emp_details) from mytable").show
    

    Expected result

    1500
    180
    

    Also I should be able to sum on the range elements too like :

    sqlContext.sql("select sum(slice(emp_details,0,3)) from mytable").show
    

    result

    600
    80
    

    when doing sum on the Array type as expected it shows that sum expects argument to be numeric type not array type.

    I think we need to create UDF for this . but how ?

    Will I be facing any performance hits with UDFs ? and is there any other solution apart from the UDF one ?

  • Jon Deaton
    Jon Deaton over 5 years
    Does this use a UDF under the hood?
  • Jacek Laskowski
    Jacek Laskowski over 5 years
    @JonDeaton What does "this" refer to? I could help if I knew the answer.
  • Jon Deaton
    Jon Deaton over 5 years
    The line map { case (deptId, deptName, details) => (deptId, deptName, details.sum) }
  • Jacek Laskowski
    Jacek Laskowski over 5 years
    Nope. It's a Scala function that Spark SQL uses literally, i.e. without any optimizations (if there were any).
  • Jacek Laskowski
    Jacek Laskowski over 5 years
    Doh! It's only now when I noticed your answer. I leave my edit intact and upvote your answer then. We're even, ain't we?
  • Jacek Laskowski
    Jacek Laskowski over 5 years
    Why would you want to do this with RDD API since it uses a Dataset?
  • stack0114106
    stack0114106 over 5 years
    the rdd way seems to be straightforward for this case. You have access to sum() and slice() functions, once you convert it to array. Also rdd is the first abstraction and it will be always compatible.
  • stack0114106
    stack0114106 over 5 years
    Matei Zaharia in Spark Definition Guide, encourages spark developers to explore all 3 abstractions - rdd,df and sql. They all convert to the same execution plan. You may not agree with me, but for this case, falling back to rdd seems to be straightforward..
  • Jacek Laskowski
    Jacek Laskowski over 5 years
    I'll then have to talk to Matei to release another edition :)
  • zero323
    zero323 over 5 years
    @JacekLaskowski No worries, though to be even, I would like to get a response to our recent pivot discussion :)
  • zero323
    zero323 over 5 years
    I think that the point that @JacekLaskowski made here is why to use expensive conversion to RDD, if Dataset and RDD functionality fully overlap, and you can do the same thing, with virtually identical code (schema vs encoder is a superficial).
  • stack0114106
    stack0114106 over 5 years
    RDD is not an expensive conversion.. the syntax may be intimidating in the beginning, I'm slowly getting used to it..
  • zero323
    zero323 over 5 years
    @stack0114106 Conversion to RDD at minimum creates an analysis barrier (not actual optimizer barrier, but outcome is equivalent), which is bad by itself. Additionally it requires significant allocation to initialize the Row objects. Finally initialization requires inefficient Encoder[Row]. All of that is expensive. Sometimes prohibitively expensive. Using functional / strongly typed (out of lack of a better description) Dataset API suffers from some, but not all these issues. And SQL / DataFrame, from none.
  • zero323
    zero323 over 5 years
    There are use cases for legacy RDD or DStream APIs (@JacekLaskowski probably wouldn't agree with me here :) ), but plain map is simply not one of them. On a side note - the interface that ought to be used for ArrayType is Seq.
  • WestCoastProjects
    WestCoastProjects over 5 years
    This is a good approach I forgot about it having used it in the past.