How to aggregate over rolling time window with groups in Spark

23,603

Revised answer:

You can use a simple window functions trick here. A bunch of imports:

from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_
from pyspark.sql.window import Window

window definition:

w = Window.partitionBy("group_by").orderBy("date")

Cast date to DateType:

df_ = df.withColumn("date", col("date").cast("date"))

Define following expressions:

# Difference from the previous record or 0 if this is the first one
diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0))

# 0 if diff <= 30, 1 otherwise
indicator = (diff > 30).cast("integer")

# Cumulative sum of indicators over the window
subgroup = sum_(indicator).over(w).alias("subgroup")

Add subgroup expression to the table:

df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg")
+--------+--------+------------+
|group_by|subgroup|avg(get_avg)|
+--------+--------+------------+
|  group1|       0|         5.0|
|  group2|       0|        20.0|
|  group2|       1|         8.0|
+--------+--------+------------+

first is not meaningful with aggregations, but if column is monotonically increasing you can use min. Otherwise you'll have to use window functions as well.

Tested using Spark 2.1. May require subqueries and Window instance when used with earlier Spark release.

The original answer (not relevant in the specified scope)

Since Spark 2.0 you should be able to use a window function:

Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).

from pyspark.sql.functions import window

df.groupBy(window("date", windowDuration="30 days")).count()

but you can see from the result,

+---------------------------------------------+-----+
|window                                       |count|
+---------------------------------------------+-----+
|[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1    |
|[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2    |
|[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1    |
+---------------------------------------------+-----+

you'll have to be a bit careful when it comes to timezones.

Share:
23,603
Mike S
Author by

Mike S

Software Engineer at Disney by day.

Updated on July 05, 2022

Comments

  • Mike S
    Mike S almost 2 years

    I have some data that I want to group by a certain column, then aggregate a series of fields based on a rolling time window from the group.

    Here is some example data:

    df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1),
                                Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2),
                                Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3),
                                Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3),
                                Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3),
                                Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)])
    

    I want to group by group_by, then create time windows that start at the earliest date and extend until there are 30 days with no entry for that group. After those 30 days are over, the next time window would start with the date of the next row that did not fall in the previous window.

    I then want to aggregate, for example getting the average of get_avg, and the first result of get_first.

    So the output for this example should be:

    group_by    first date of window    get_avg  get_first
    group1      2016-01-01              5        1
    group2      2016-02-01              20       3
    group2      2016-04-02              8        4
    

    edit: sorry I realized my question was not specified properly. I actually want a window that ends after 30 days of inactivity. I have modified the group2 portion of the example accordingly.

  • G_cy
    G_cy about 6 years
    could you explain how the sum work here? what is the frame of the window when calculating this sum? if there were 90 days' gap between one group, number of sub will be 0, 1, 2?
  • zero323
    zero323 about 6 years
    @G_cy It is literally sum over an indicator function - applying indicator yields a sequence of {0, 1}, where each 1 denotes gap > threshold. And sum is just a cumulative sum over this sequence. So the answer to your question is negative. It doesn't distinguish between gaps 1 * threshold, 2 * threshold ... n * threshold.
  • G_cy
    G_cy about 6 years
    I checked it step by step. In one group, I understand that indicator yields a sequence of {0, 1}. In one group it shows like: (g1, 0), (g1, 0).....(g1, 1)...(g1, 0)...(g1, 1). Then I put sum in show, it shows (g1,0), (g1, 0), (g1,0) ...(g1, 1), (g1, 1), (g1, 1).... It looks like sum will only sum up all {0, 1} sequence before current row. Is it true?