Getting last value of group in Spark
I don't know for sparkR so I'll answer in pyspark. You can achieve this using window functions.
First, let's define the "groupings of newcust
", you want every line where newcust
equals 1 to be the start of a new group, computing a cumulative sum will do the trick:
from pyspark.sql import Window
import pyspark.sql.functions as psf
w1 = Window.partitionBy("custId").orderBy("date")
df1 = df.withColumn("subgroup", psf.sum("newcust").over(w1))
+------+----------+----+-------+--------+
|custId| date|desc|newcust|subgroup|
+------+----------+----+-------+--------+
| 1001|2013-08-01| New| 1| 1|
| 1001|2014-01-01| New| 1| 2|
| 1001|2014-02-01|Good| 0| 2|
| 1001|2014-03-01| New| 1| 3|
| 1001|2014-04-01| Bad| 0| 3|
| 1002|2014-02-01| New| 1| 1|
| 1002|2014-03-01|Good| 0| 1|
| 1002|2014-04-01|Good| 0| 1|
| 1003|2014-04-01| New| 1| 1|
+------+----------+----+-------+--------+
For each subgroup
, we want to keep the first date:
w2 = Window.partitionBy("custId", "subgroup")
df2 = df1.withColumn("first_date", psf.min("date").over(w2))
+------+----------+----+-------+--------+----------+
|custId| date|desc|newcust|subgroup|first_date|
+------+----------+----+-------+--------+----------+
| 1001|2013-08-01| New| 1| 1|2013-08-01|
| 1001|2014-01-01| New| 1| 2|2014-01-01|
| 1001|2014-02-01|Good| 0| 2|2014-01-01|
| 1001|2014-03-01| New| 1| 3|2014-03-01|
| 1001|2014-04-01| Bad| 0| 3|2014-03-01|
| 1002|2014-02-01| New| 1| 1|2014-02-01|
| 1002|2014-03-01|Good| 0| 1|2014-02-01|
| 1002|2014-04-01|Good| 0| 1|2014-02-01|
| 1003|2014-04-01| New| 1| 1|2014-04-01|
+------+----------+----+-------+--------+----------+
Finally, we want to keep the last line (ordered by date) of every subgroup
:
w3 = Window.partitionBy("custId", "subgroup").orderBy(psf.desc("date"))
df3 = df2.withColumn(
"rn",
psf.row_number().over(w3)
).filter("rn = 1").select(
"custId",
psf.col("first_date").alias("date"),
"desc"
)
+------+----------+----+
|custId| date|desc|
+------+----------+----+
| 1001|2013-08-01| New|
| 1001|2014-01-01|Good|
| 1001|2014-03-01| Bad|
| 1002|2014-02-01|Good|
| 1003|2014-04-01| New|
+------+----------+----+
Gaurav Bansal
Updated on June 09, 2022Comments
-
Gaurav Bansal almost 2 years
I have a SparkR DataFrame as shown below:
#Create R data.frame custId <- c(rep(1001, 5), rep(1002, 3), 1003) date <- c('2013-08-01','2014-01-01','2014-02-01','2014-03-01','2014-04-01','2014-02-01','2014-03-01','2014-04-01','2014-04-01') desc <- c('New','New','Good','New', 'Bad','New','Good','Good','New') newcust <- c(1,1,0,1,0,1,0,0,1) df <- data.frame(custId, date, desc, newcust) #Create SparkR DataFrame df <- createDataFrame(df) display(df) custId| date | desc | newcust -------------------------------------- 1001 | 2013-08-01| New | 1 1001 | 2014-01-01| New | 1 1001 | 2014-02-01| Good | 0 1001 | 2014-03-01| New | 1 1001 | 2014-04-01| Bad | 0 1002 | 2014-02-01| New | 1 1002 | 2014-03-01| Good | 0 1002 | 2014-04-01| Good | 0 1003 | 2014-04-01| New | 1
newcust
indicates a new customer every time a newcustId
appears, or if the samecustId
'sdesc
reverts to 'New'. What I want to obtain is the lastdesc
value for each grouping ofnewcust
, while maintaining the firstdate
for each grouping. Below is the DataFrame I want to obtain. How can I do this in Spark? Either PySpark or SparkR code will work.#What I want custId| date | newcust | finaldesc ---------------------------------------------- 1001 | 2013-08-01| 1 | New 1001 | 2014-01-01| 1 | Good 1001 | 2014-03-01| 1 | Bad 1002 | 2014-02-01| 1 | Good 1003 | 2014-04-01| 1 | New