Spark - How to calculate percentiles in Spark?

19,395

Solution 1

For Spark 2.x, you can use approxQuantile, as in the following example:

val df = Seq(
  10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
  20, 21, 22, 23, 24, 25, 26, 27, 28, 29
).toDF("num")

df.stat.approxQuantile("num", Array(0.8), 0.1)
// res4: Array[Double] = Array(26.0)

Note that the smaller the 3rd parameter relativeError, the more expensive is the calculation. Here's a relevant note in the API doc:

relativeError: The relative target precision to achieve (greater than or equal to 0). If set to zero, the exact quantiles are computed, which could be very expensive.

Solution 2

You can use the Spark SQL function approx_percentile(col, percentage):

val df = Seq(0.5, 0.4, 0.1).toDF
df.agg(expr("approx_percentile(value, array(0.5))").as("percentile")).show
// +----------+
// |percentile|
// +----------+
// |     [0.4]|
// +----------+

https://spark.apache.org/docs/latest/api/sql/index.html#approx_percentile

Solution 3

With large datasets, you probably should go with an approximate approach

import org.apache.spark.sql.functions.{callUDF, lit}

df.agg(callUDF("percentile_approx", $"someColumn", lit(0.8)).as("percentile80"))

Solution 4

approx_percentile and percentile are part of the SQL API.

Suppose you have the following DataFrame:

+--------+
|some_int|
+--------+
|       0|
|      10|
+--------+

Here's how to calculate the 50th percentile with the expr hack:

df.agg(expr("percentile(some_int, 0.5)").as("50_percentile"))
+-------------+
|50_percentile|
+-------------+
|          5.0|
+-------------+

I created a library called bebe that also exposes these methods via the Scala API (so you don't need to write strings that invoke functions in your Scala code).

df.agg(bebe_percentile(col("some_int"), lit(0.5)).as("50_percentile"))
+-------------+
|50_percentile|
+-------------+
|          5.0|
+-------------+

See the bebe README for instructions on how to use bebe_approx_percentile.

Share:
19,395
Ignacio Alorre
Author by

Ignacio Alorre

Currently learning about big data technologies

Updated on June 12, 2022

Comments

  • Ignacio Alorre
    Ignacio Alorre almost 2 years

    I was trying to to get the 0.8 percentile of a single column dataframe. I tried in this way:

    val limit80 = 0.8
    val dfSize = df.count()
    val perfentileIndex = dfSize*limit80 
    
    dfSorted = df.sort()
    val percentile80 = dfSorted .take(perfentileIndex).last()
    

    But I think this will fail for big dataframes, since they may be distributed across different nodes.

    Is there any better way to calculate the percentile? or how could I have all the rows of the dataframe in the same machine (even if that is very anti-pattern) so the df.take(index) will really take into account the whole dataset and not just a partition in a node.

  • Raphael Roth
    Raphael Roth almost 6 years
    this is actually a hive UDAF and rather slow
  • Ignacio Alorre
    Ignacio Alorre almost 6 years
    Thanks, that is what I was looking for. I was using Spark 1.6 and I didn't know about this method.
  • Pierre Lacave
    Pierre Lacave almost 6 years
    @RaphaelRoth out of curiosity, do you have more info about this? Had a quick run comparing the two over a billion integer and with the same accuracy (0.0001), the UDF one is actually consistency faster in that particular case
  • Raphael Roth
    Raphael Roth almost 6 years
    i've made some benchmarks comparing hives percentile with a costum UDAF and with using collect_list followed by an UDF to calculate the percentiles, and hives percentile was the slowest option in my case.