pyspark: passing multiple dataframe fields to udf
I'm not so sure what is the data schema you have.
But the following example is the right way to use udf
to get the answer to your example.
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
def distance(origin, destination):
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
* math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
d = radius * c
return d
df = spark.createDataFrame([([101, 121], [-121, -212])], ["origin", "destination"])
filter_udf = udf(distance, DoubleType())
df.withColumn("distance", filter_udf(df.origin, df.destination))
+----------+------------+------------------+
| origin| destination| distance|
+----------+------------+------------------+
|[101, 121]|[-121, -212]|15447.812243421227|
+----------+------------+------------------+
Admin
Updated on June 25, 2022Comments
-
Admin almost 2 years
I am new to spark and python. Any help appreciated.
I am having a UDF and created a spark dataframe with US zipcd, latitude and Longitude
UDF:
import math def distance(origin, destination): lat1, lon1 = origin lat2, lon2 = destination radius = 6371 # km dlat = math.radians(lat2-lat1) dlon = math.radians(lon2-lon1) a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \ * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) d = radius * c return d
sample UDF output:
distance((101,121),(-121,-212))
15447.812243421227
dataframe:
zip=spark.read.option("sep", ",").csv('wasb://[email protected]/main/zip.txt') zip1=zip.select(zip._c0,zip._c1.cast("Double"),zip._c2.cast("Double"))
Sample zip1 data:
zip1.first()
Row(_c0=u'00601', _c1=18.180555, _c2=-66.749961)
Now I am trying to pass the latitude and longitude from the df zip1 to the udf distance, but I am getting error like "a float is required". I believe the udf is not getting the data from df fields, instead its reading the df column as constant value; and hence I am getting below error.
z=zip1.select(distance((zip1._c1,100.23),(zip1._c2,-99.21)))
Traceback (most recent call last):
File "", line 1, in
File "", line 5, in distance
TypeError: a float is requiredPlease let me know the right way to pass the df fields to udf.
-
chilun over 6 years@vaira Do you have any problem with the code above? If the answer is what you want , please select it to be answer, thanks.