User defined function to be applied to Window in PySpark?

12,524

Solution 1

Spark >= 3.0:

SPARK-24561 - User-defined window functions with pandas udf (bounded window) is a a work in progress. Please follow the related JIRA for details.

Spark >= 2.4:

SPARK-22239 - User-defined window functions with pandas udf (unbounded window) introduced support for Pandas based window functions with unbounded windows. General structure is

return_type: DataType

@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
  return ... 

w = (Window
    .partitionBy(grouping_column)
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

df.withColumn('foo', f('bar').over(w))

Please see the doctests and the unit tests for detailed examples.

Spark < 2.4

You cannot. Window functions require UserDefinedAggregateFunction or equivalent object, not UserDefinedFunction, and it is not possible to define one in PySpark.

However, in PySpark 2.3 or later, you can define vectorized pandas_udf, which can be applied on grouped data. You can find a working example Applying UDFs on GroupedData in PySpark (with functioning python example). While Pandas don't provide direct equivalent of window functions, there are expressive enough to implement any window-like logic, especially with pandas.DataFrame.rolling. Furthermore function used with GroupedData.apply can return arbitrary number of rows.

You can also call Scala UDAF from PySpark Spark: How to map Python with Scala or Java User Defined Functions?.

Solution 2

UDFs can be applied to Window now as of Spark 3.0.0.

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html

Extract from the documentation:


from pyspark.sql import Window

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.0|
|  1| 2.0|   1.5|
|  2| 3.0|   3.0|
|  2| 5.0|   4.0|
|  2|10.0|   7.5|
+---+----+------+
Share:
12,524

Related videos on Youtube

Akavall
Author by

Akavall

I like programming, machine learning, statistics, all kinds of problem solving, and I play chess. My github

Updated on July 05, 2022

Comments

  • Akavall
    Akavall over 1 year

    I am trying to apply a user defined function to Window in PySpark. I have read that UDAF might be the way to to go, but I was not able to find anything concrete.

    To give an example (taken from here: Xinh's Tech Blog and modified for PySpark):

    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    from pyspark.sql.window import Window
    from pyspark.sql.functions import avg
    
    spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()
    
    a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])
    
    customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
                                        ["Alice", "2016-05-03", 45.00],
                                        ["Alice", "2016-05-04", 55.00],
                                        ["Bob", "2016-05-01", 25.00],
                                        ["Bob", "2016-05-04", 29.00],
                                        ["Bob", "2016-05-06", 27.00]],
                                   ["name", "date", "amountSpent"])
    
    customers.show()
    
    window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
    
    result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))
    
    result.show()
    

    I am applying avg which is already built into pyspark.sql.functions, but if instead of avg I wanted to use something of more complicated and write my own function, how would I do that?