Spark - How to calculate percentiles in Spark?
Solution 1
For Spark 2.x, you can use approxQuantile, as in the following example:
val df = Seq(
10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
20, 21, 22, 23, 24, 25, 26, 27, 28, 29
).toDF("num")
df.stat.approxQuantile("num", Array(0.8), 0.1)
// res4: Array[Double] = Array(26.0)
Note that the smaller the 3rd parameter relativeError
, the more expensive is the calculation. Here's a relevant note in the API doc:
relativeError: The relative target precision to achieve (greater than or equal to 0). If set to zero, the exact quantiles are computed, which could be very expensive.
Solution 2
You can use the Spark SQL function approx_percentile(col, percentage)
:
val df = Seq(0.5, 0.4, 0.1).toDF
df.agg(expr("approx_percentile(value, array(0.5))").as("percentile")).show
// +----------+
// |percentile|
// +----------+
// | [0.4]|
// +----------+
https://spark.apache.org/docs/latest/api/sql/index.html#approx_percentile
Solution 3
With large datasets, you probably should go with an approximate approach
import org.apache.spark.sql.functions.{callUDF, lit}
df.agg(callUDF("percentile_approx", $"someColumn", lit(0.8)).as("percentile80"))
Solution 4
approx_percentile and percentile are part of the SQL API.
Suppose you have the following DataFrame:
+--------+
|some_int|
+--------+
| 0|
| 10|
+--------+
Here's how to calculate the 50th percentile with the expr hack:
df.agg(expr("percentile(some_int, 0.5)").as("50_percentile"))
+-------------+
|50_percentile|
+-------------+
| 5.0|
+-------------+
I created a library called bebe that also exposes these methods via the Scala API (so you don't need to write strings that invoke functions in your Scala code).
df.agg(bebe_percentile(col("some_int"), lit(0.5)).as("50_percentile"))
+-------------+
|50_percentile|
+-------------+
| 5.0|
+-------------+
See the bebe README for instructions on how to use bebe_approx_percentile
.
Comments
-
Ignacio Alorre almost 2 years
I was trying to to get the 0.8 percentile of a single column dataframe. I tried in this way:
val limit80 = 0.8 val dfSize = df.count() val perfentileIndex = dfSize*limit80 dfSorted = df.sort() val percentile80 = dfSorted .take(perfentileIndex).last()
But I think this will fail for big dataframes, since they may be distributed across different nodes.
Is there any better way to calculate the percentile? or how could I have all the rows of the dataframe in the same machine (even if that is very anti-pattern) so the
df.take(index)
will really take into account the whole dataset and not just a partition in a node. -
Raphael Roth almost 6 yearsthis is actually a hive UDAF and rather slow
-
Ignacio Alorre almost 6 yearsThanks, that is what I was looking for. I was using Spark 1.6 and I didn't know about this method.
-
Pierre Lacave almost 6 years@RaphaelRoth out of curiosity, do you have more info about this? Had a quick run comparing the two over a billion integer and with the same accuracy (0.0001), the UDF one is actually consistency faster in that particular case
-
Raphael Roth almost 6 yearsi've made some benchmarks comparing hives
percentile
with a costum UDAF and with usingcollect_list
followed by an UDF to calculate the percentiles, and hivespercentile
was the slowest option in my case.