pyspark: count distinct over a window
Solution 1
EDIT:
As noleto mentions in his answer below, there is now an approx_count_distinct function since pyspark 2.1 that works over a window.
Original Answer
I figured out that I can use a combination of the collect_set and size functions to mimic the functionality of countDistinct over a window:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
#function to calculate number of seconds from number of days
days = lambda i: i * 86400
#create some test data
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
(13, "2017-03-15T12:27:18+00:00", "red"),
(25, "2017-03-18T11:27:18+00:00", "red")],
["dollars", "timestampGMT", "color"])
#convert string timestamp to timestamp type
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))
#use collect_set and size functions to perform countDistinct over a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w)))
df.show()
This results in the distinct count of color over the previous week of records:
+-------+--------------------+------+---------------------------------------+
|dollars| timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
| 17|2017-03-10 15:27:...|orange| 1|
| 13|2017-03-15 12:27:...| red| 2|
| 25|2017-03-18 11:27:...| red| 1|
+-------+--------------------+------+---------------------------------------+
Solution 2
@Bob Swain's answer is nice and works! Since then, Spark version 2.1, Spark offers an equivalent to countDistinct
function, approx_count_distinct
which is more efficient to use and most importantly, supports counting distinct over a window.
Here goes the code to drop in replacement:
#approx_count_distinct supports a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.approx_count_distinct("color").over(w))
For columns with small cardinalities, result is supposed to be the same as "countDistinct". When dataset grows a lot, you should consider adjusting the parameter rsd
– maximum estimation error allowed, which allows you to tune the trade-off precision/performance.
Bob Swain
audiobook addict who sometimes plays with data and writes code
Updated on March 31, 2021Comments
-
Bob Swain about 3 years
I just tried doing a
countDistinct
over a window and got this error:AnalysisException: u'Distinct window functions are not supported: count(distinct color#1926)
Is there a way to do a distinct count over a window in pyspark?
Here's some example code:
from pyspark.sql.window import Window from pyspark.sql import functions as F #function to calculate number of seconds from number of days days = lambda i: i * 86400 df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"), (13, "2017-03-15T12:27:18+00:00", "red"), (25, "2017-03-18T11:27:18+00:00", "red")], ["dollars", "timestampGMT", "color"]) df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) #create window by casting timestamp to long (number of seconds) w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)) df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w)) df.show()
This is the output I'd like to see:
+-------+--------------------+------+---------------------------------------+ |dollars| timestampGMT| color|distinct_color_count_over_the_last_week| +-------+--------------------+------+---------------------------------------+ | 17|2017-03-10 15:27:...|orange| 1| | 13|2017-03-15 12:27:...| red| 2| | 25|2017-03-18 11:27:...| red| 1| +-------+--------------------+------+---------------------------------------+