pyspark: passing multiple dataframe fields to udf

11,504

I'm not so sure what is the data schema you have. But the following example is the right way to use udf to get the answer to your example.

from pyspark.sql.functions import *
from pyspark.sql.types import *
import math

def distance(origin, destination):
    lat1, lon1 = origin
    lat2, lon2 = destination
    radius = 6371 # km
    dlat = math.radians(lat2-lat1)
    dlon = math.radians(lon2-lon1)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
    * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c
    return d

df = spark.createDataFrame([([101, 121], [-121, -212])], ["origin", "destination"])
filter_udf = udf(distance, DoubleType())
df.withColumn("distance", filter_udf(df.origin, df.destination))

+----------+------------+------------------+
|    origin| destination|          distance|
+----------+------------+------------------+
|[101, 121]|[-121, -212]|15447.812243421227|
+----------+------------+------------------+
Share:
11,504
Admin
Author by

Admin

Updated on June 25, 2022

Comments

  • Admin
    Admin almost 2 years

    I am new to spark and python. Any help appreciated.

    I am having a UDF and created a spark dataframe with US zipcd, latitude and Longitude

    UDF:

    import math
    def distance(origin, destination):
    lat1, lon1 = origin
    lat2, lon2 = destination
    radius = 6371 # km
    dlat = math.radians(lat2-lat1)
    dlon = math.radians(lon2-lon1)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
        * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c
    return d
    

    sample UDF output:

    distance((101,121),(-121,-212)) 
    

    15447.812243421227

    dataframe:

    zip=spark.read.option("sep", ",").csv('wasb://[email protected]/main/zip.txt')
    zip1=zip.select(zip._c0,zip._c1.cast("Double"),zip._c2.cast("Double"))
    

    Sample zip1 data:

    zip1.first()        
    

    Row(_c0=u'00601', _c1=18.180555, _c2=-66.749961)

    Now I am trying to pass the latitude and longitude from the df zip1 to the udf distance, but I am getting error like "a float is required". I believe the udf is not getting the data from df fields, instead its reading the df column as constant value; and hence I am getting below error.

    z=zip1.select(distance((zip1._c1,100.23),(zip1._c2,-99.21)))
    

    Traceback (most recent call last):
    File "", line 1, in
    File "", line 5, in distance
    TypeError: a float is required

    Please let me know the right way to pass the df fields to udf.

  • chilun
    chilun over 6 years
    @vaira Do you have any problem with the code above? If the answer is what you want , please select it to be answer, thanks.