Getting last value of group in Spark

10,932

I don't know for sparkR so I'll answer in pyspark. You can achieve this using window functions.

First, let's define the "groupings of newcust", you want every line where newcust equals 1 to be the start of a new group, computing a cumulative sum will do the trick:

from pyspark.sql import Window
import pyspark.sql.functions as psf

w1 = Window.partitionBy("custId").orderBy("date")
df1 = df.withColumn("subgroup", psf.sum("newcust").over(w1))

+------+----------+----+-------+--------+
|custId|      date|desc|newcust|subgroup|
+------+----------+----+-------+--------+
|  1001|2013-08-01| New|      1|       1|
|  1001|2014-01-01| New|      1|       2|
|  1001|2014-02-01|Good|      0|       2|
|  1001|2014-03-01| New|      1|       3|
|  1001|2014-04-01| Bad|      0|       3|
|  1002|2014-02-01| New|      1|       1|
|  1002|2014-03-01|Good|      0|       1|
|  1002|2014-04-01|Good|      0|       1|
|  1003|2014-04-01| New|      1|       1|
+------+----------+----+-------+--------+

For each subgroup, we want to keep the first date:

w2 = Window.partitionBy("custId", "subgroup")
df2 = df1.withColumn("first_date", psf.min("date").over(w2))

+------+----------+----+-------+--------+----------+
|custId|      date|desc|newcust|subgroup|first_date|
+------+----------+----+-------+--------+----------+
|  1001|2013-08-01| New|      1|       1|2013-08-01|
|  1001|2014-01-01| New|      1|       2|2014-01-01|
|  1001|2014-02-01|Good|      0|       2|2014-01-01|
|  1001|2014-03-01| New|      1|       3|2014-03-01|
|  1001|2014-04-01| Bad|      0|       3|2014-03-01|
|  1002|2014-02-01| New|      1|       1|2014-02-01|
|  1002|2014-03-01|Good|      0|       1|2014-02-01|
|  1002|2014-04-01|Good|      0|       1|2014-02-01|
|  1003|2014-04-01| New|      1|       1|2014-04-01|
+------+----------+----+-------+--------+----------+

Finally, we want to keep the last line (ordered by date) of every subgroup:

w3 = Window.partitionBy("custId", "subgroup").orderBy(psf.desc("date"))
df3 = df2.withColumn(
    "rn", 
    psf.row_number().over(w3)
).filter("rn = 1").select(
    "custId", 
    psf.col("first_date").alias("date"), 
    "desc"
)

+------+----------+----+
|custId|      date|desc|
+------+----------+----+
|  1001|2013-08-01| New|
|  1001|2014-01-01|Good|
|  1001|2014-03-01| Bad|
|  1002|2014-02-01|Good|
|  1003|2014-04-01| New|
+------+----------+----+
Share:
10,932
Gaurav Bansal
Author by

Gaurav Bansal

Updated on June 09, 2022

Comments

  • Gaurav Bansal
    Gaurav Bansal almost 2 years

    I have a SparkR DataFrame as shown below:

    #Create R data.frame
    custId <- c(rep(1001, 5), rep(1002, 3), 1003)
    date <- c('2013-08-01','2014-01-01','2014-02-01','2014-03-01','2014-04-01','2014-02-01','2014-03-01','2014-04-01','2014-04-01')
    desc <- c('New','New','Good','New', 'Bad','New','Good','Good','New')
    newcust <- c(1,1,0,1,0,1,0,0,1)
    df <- data.frame(custId, date, desc, newcust)
    
    #Create SparkR DataFrame    
    df <- createDataFrame(df)
    display(df)
          custId|    date   | desc | newcust
          --------------------------------------
           1001 | 2013-08-01| New  |   1
           1001 | 2014-01-01| New  |   1
           1001 | 2014-02-01| Good |   0
           1001 | 2014-03-01| New  |   1
           1001 | 2014-04-01| Bad  |   0
           1002 | 2014-02-01| New  |   1
           1002 | 2014-03-01| Good |   0
           1002 | 2014-04-01| Good |   0 
           1003 | 2014-04-01| New  |   1
    

    newcust indicates a new customer every time a new custId appears, or if the same custId's desc reverts to 'New'. What I want to obtain is the last desc value for each grouping of newcust, while maintaining the first date for each grouping. Below is the DataFrame I want to obtain. How can I do this in Spark? Either PySpark or SparkR code will work.

    #What I want 
    custId|    date   | newcust | finaldesc
    ----------------------------------------------
     1001 | 2013-08-01|   1     | New
     1001 | 2014-01-01|   1     | Good
     1001 | 2014-03-01|   1     | Bad
     1002 | 2014-02-01|   1     | Good
     1003 | 2014-04-01|   1     | New