How to derive Percentile using Spark Data frame and GroupBy in python

21,024

Solution 1

You can use "percentile_approx" using sql. It is difficult to create UDF in pyspark.

Refer to this link for other details: https://mail-archives.apache.org/mod_mbox/spark-user/201510.mbox/%3CCALte62wQV68D6J87EVq6AD5-T3D0F3fHjuzs+1C5aCHOUUQS8w@mail.gmail.com%3E

Solution 2

You can use window functions, just define an aggregation window (all data in your case) and then filter by percentile value:

from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank

w =  Window.orderBy(df.price)
df.select('price', percent_rank().over(w).alias("percentile"))\
    .where('percentile == 0.6').show()

percent_rank is available in pyspark.sql.functions

If you prefer you can use the SQL interface in this databricks post

Solution 3

I know a solution to get the percentile of every row with RDDs. First, convert your RDD to a DataFrame:

# convert to rdd of dicts
rdd = df.rdd
rdd = rdd.map(lambda x: x.asDict())

Then, you can compute each row's percentile:

column_to_decile = 'price'
total_num_rows = rdd.count()


def add_to_dict(_dict, key, value):
    _dict[key] = value
    return _dict


def get_percentile(x, total_num_rows):
    _dict, row_number = x
    percentile = x[1] / float(total_num_rows)
    return add_to_dict(_dict, "percentile", percentile)


rdd_percentile = rdd.map(lambda d: (d[column_to_decile], d)) # make column_to_decile a key
rdd_percentile = rdd_percentile.sortByKey(ascending=False) # so 1st decile has largest
rdd_percentile = rdd_percentile.map(lambda x: x[1]) # remove key
rdd_percentile = rdd_percentile.zipWithIndex() # append row number
rdd_percentile = rdd_percentile.map(lambda x: get_percentile(x, total_num_rows))

And finally, convert back into a DataFrame with:

df = sqlContext.createDataFrame(rdd_percentile)

To get the row with the closest percentile to 0.6, you could do something like this:

from pyspark.sql.types import *
from pyspark.sql.functions import udf


def get_row_with_percentile(df, percentile):
    func = udf(lambda x: abs(x), DoubleType())
    df_distance = df.withColumn("distance", func(df['percentile'] - percentile))
    min_distance = df_distance.groupBy().min('distance').collect()[0]['min(distance)']
    result = df_distance.filter(df_distance['distance'] == min_distance)
    result.drop("distance")
    return result


get_row_with_percentile(df, 0.6).show()
Share:
21,024
Somashekar Muniyappa
Author by

Somashekar Muniyappa

Updated on July 16, 2020

Comments

  • Somashekar Muniyappa
    Somashekar Muniyappa over 3 years

    I have a Spark dataframe which has Date, Group and Price columns.

    I'm trying to derive the percentile(0.6) for the Price column of that dataframe in Python. Besides, I need to add the output as a new column.

    I tried the code below:

    perudf = udf(lambda x: x.quantile(.6))
    df1 = df.withColumn("Percentile", df.groupBy("group").agg("group"),perudf('price'))
    

    but it is throwing the following error:

    assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
    AssertionError: all exprs should be Column
    
  • Galen Long
    Galen Long over 7 years
    For those interested/lazy, that's from pyspark import SparkContext, HiveContext; sc = SparkContext(); hiveContext = HiveContext(sc); hiveContext.registerDataFrameAsTable(df, "df"); hiveContext.sql("SELECT percentile(price, 0.75) FROM df"); to get the price at the 75th percentile.
  • Carl Smith
    Carl Smith over 5 years
    I found that databricks post useful, thanks! Here is a working link to it: databricks.com/blog/2015/07/15/…