PySpark - Pass list as parameter to UDF

34,459

Solution 1

from pyspark.sql.functions import udf, col

#sample data
a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])
label_list = ["Great", "Good", "OK", "Please Move", "Dead"]

def cate(label, feature_list):
    if feature_list == 0:
        return label[4]
    else:  #you may need to add 'else' condition as well otherwise 'null' will be added in this case
        return 'I am not sure!'

def udf_score(label_list):
    return udf(lambda l: cate(l, label_list))
a.withColumn("category", udf_score(label_list)(col("distances"))).show()

Output is:

+------+---------+--------------+
|Letter|distances|      category|
+------+---------+--------------+
|     A|       20|I am not sure!|
|     B|       30|I am not sure!|
|     D|       80|I am not sure!|
+------+---------+--------------+

Solution 2

Try currying the function, so that the only argument in the DataFrame call is the name of the column on which you want the function to act:

udf_score=udf(lambda x: cate(label_list,x), StringType())
a.withColumn("category", udf_score("distances")).show(10)

Solution 3

I think this may help by passing list as a default value of a variable

from pyspark.sql.functions import udf, col

#sample data
a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80),("E",0)],["Letter", "distances"])
label_list = ["Great", "Good", "OK", "Please Move", "Dead"]

#Passing List as Default value to a variable
def cate( feature_list,label=label_list):
    if feature_list == 0:
        return label[4]
    else:  #you may need to add 'else' condition as well otherwise 'null' will be added in this case
        return 'I am not sure!'

udfcate = udf(cate, StringType())

a.withColumn("category", udfcate("distances")).show()

Output:

+------+---------+--------------+
|Letter|distances|      category|
+------+---------+--------------+
|     A|       20|I am not sure!|
|     B|       30|I am not sure!|
|     D|       80|I am not sure!|
|     E|        0|          Dead|
+------+---------+--------------+
Share:
34,459
Bryce Ramgovind
Author by

Bryce Ramgovind

"The best thing about a boolean is even if you are wrong, you are only off by a bit. (Anonymous)" Data Scientist by profession, world traveler by passion and overall positive thinker!

Updated on July 05, 2022

Comments

  • Bryce Ramgovind
    Bryce Ramgovind almost 2 years

    I need to pass a list into a UDF, the list will determine the score/category of the distance. For now, I am hard coding all distances to be the 4th score.

    a= spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])
    
    from pyspark.sql.functions import udf
    def cate(label, feature_list):
        if feature_list == 0:
            return label[4]
    label_list = ["Great", "Good", "OK", "Please Move", "Dead"]
    udf_score=udf(cate, StringType())
    a.withColumn("category", udf_score(label_list,a["distances"])).show(10)
    

    when I try something like this, I get this error.

    Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
    py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)