Spark Scala : Getting Cumulative Sum (Running Total) Using Analytical Functions

11,316

Solution 1

This happens because of incorrect types. Because salary is a string

DS1.printSchema
root
 |-- dept_no: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- date: string (nullable = true)

it is sorted lexicographically:

DS1.orderBy("sal").show
+-------+--------+----+----------+
|dept_no|emp_name| sal|      date|
+-------+--------+----+----------+
|     30|  Martin|1250|2017-11-21|
|     30|    Ward|1250|2018-02-05|
|     10|  MILLER|1300|2017-11-03|
|     10|   Clark|2450| 2017-12-9|
|     10|    King|5000|2018-01-28|
|     30|   James| 950|2017-10-18|
+-------+--------+----+----------+ 

To get desired result you have to cast (and there is no need for frame definition):

DS1.withColumn("Dept_CumSal", sum("sal").over(
  Window
     .partitionBy("dept_no")
    .orderBy(col("sal").cast("integer"), col("emp_name"), col("date").asc))).show
+-------+--------+----+----------+-----------+                                  
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     30|   James| 950|2017-10-18|      950.0|
|     30|  Martin|1250|2017-11-21|     2200.0|
|     30|    Ward|1250|2018-02-05|     3450.0|
|     10|  MILLER|1300|2017-11-03|     1300.0|
|     10|   Clark|2450| 2017-12-9|     3750.0|
|     10|    King|5000|2018-01-28|     8750.0|
+-------+--------+----+----------+-----------+

Solution 2

Notice that the order by inside your window (col("sal"), col("emp_name"), col("date").asc), is not the same order as the the show "dept_no", "date" Why do you need "sal" and "emp_name" in the window? Why not just order by date?

Share:
11,316
RaAm
Author by

RaAm

Updated on July 25, 2022

Comments

  • RaAm
    RaAm almost 2 years

    I am implementing the Cumulative Sum in Spark using Window Function. But the order of records input is not maintained while applying the window partition function

    Input data:

    val base = List(List("10", "MILLER", "1300", "2017-11-03"), List("10", "Clark", "2450", "2017-12-9"), List("10", "King", "5000", "2018-01-28"),
      List("30", "James", "950", "2017-10-18"), List("30", "Martin", "1250", "2017-11-21"), List("30", "Ward", "1250", "2018-02-05"))
      .map(row => (row(0), row(1), row(2), row(3)))
    
    val DS1 = base.toDF("dept_no", "emp_name", "sal", "date")
    DS1.show()
    
    +-------+--------+----+----------+
    |dept_no|emp_name| sal|      date|
    +-------+--------+----+----------+
    |     10|  MILLER|1300|2017-11-03|
    |     10|   Clark|2450| 2017-12-9|
    |     10|    King|5000|2018-01-28|
    |     30|   James| 950|2017-10-18|
    |     30|  Martin|1250|2017-11-21|
    |     30|    Ward|1250|2018-02-05|
    +-------+--------+----+----------+
    

    Expected Output:

    +-------+--------+----+----------+-----------+
    |dept_no|emp_name| sal|      date|Dept_CumSal|
    +-------+--------+----+----------+-----------+
    |     10|  MILLER|1300|2017-11-03|     1300.0|
    |     10|   Clark|2450| 2017-12-9|     3750.0|
    |     10|    King|5000|2018-01-28|     8750.0|
    |     30|   James| 950|2017-10-18|      950.0|
    |     30|  Martin|1250|2017-11-21|     2200.0|
    |     30|    Ward|1250|2018-02-05|     3450.0|
    +-------+--------+----+----------+-----------+
    

    I have tried the below logic

    val baseDepCumSal = DS1.withColumn("Dept_CumSal", sum("sal").over(Window.partitionBy("dept_no").
      orderBy(col("sal"), col("emp_name"), col("date").asc).
      rowsBetween(Long.MinValue, 0)
    ))
    
    baseDepCumSal.orderBy("dept_no", "date").show
    
    +-------+--------+----+----------+-----------+
    |dept_no|emp_name| sal|      date|Dept_CumSal|
    +-------+--------+----+----------+-----------+
    |     10|  MILLER|1300|2017-11-03|     1300.0|
    |     10|   Clark|2450| 2017-12-9|     3750.0|
    |     10|    King|5000|2018-01-28|     8750.0|
    |     30|   James| 950|2017-10-18|     3450.0|
    |     30|  Martin|1250|2017-11-21|     1250.0|
    |     30|    Ward|1250|2018-02-05|     2500.0|
    +-------+--------+----+----------+-----------+
    

    for dept_no=10, the records are computed in expected order, whereas for dept_no=30, the records were not computed in the input order.