Spark: How to map Python with Scala or Java User Defined Functions?
Spark 2.1+
You can use SQLContext.registerJavaFunction
:
Register a java UDF so it can be used in SQL statements.
which requires a name
, fully qualified name of Java class, and optional return type. Unfortunately for now it can be used only in SQL statements (or with expr
/ selectExpr
) and requires a Java org.apache.spark.sql.api.java.UDF*
:
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.1.0"
)
package com.example.spark.udfs
import org.apache.spark.sql.api.java.UDF1
class addOne extends UDF1[Integer, Integer] {
def call(x: Integer) = x + 1
}
sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne")
sqlContext.sql("SELECT add_one(1)").show()
## +------+
## |UDF(1)|
## +------+
## | 2|
## +------+
Version indpendent:
I wouldn't go so far as to say it is supported but it is certainly possible. All SQL functions available currently in PySpark are simply a wrappers around Scala API.
Lets assume I want to reuse GroupConcat
UDAF I've created as an answer to SPARK SQL replacement for mysql GROUP_CONCAT aggregate function and it is located in a package com.example.udaf
:
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql import Row
row = Row("k", "v")
df = sc.parallelize([
row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF()
def groupConcat(col):
"""Group and concatenate values for a given column
>>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
>>> df.select(groupConcat("v").alias("vs"))
[Row(vs=u'foo,bar')]
"""
sc = SparkContext._active_spark_context
# It is possible to use java_import to avoid full package path
_groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply
# Converting to Seq to match apply(exprs: Column*)
return Column(_groupConcat(_to_seq(sc, [col], _to_java_column)))
df.groupBy("k").agg(groupConcat("v").alias("vs")).show()
## +---+---------+
## | k| vs|
## +---+---------+
## | 1|foo1,foo2|
## | 2|bar1,bar2|
## +---+---------+
There is far too much leading underscores for my taste but as you can see it can be done.
Related to:
- Calling Java/Scala function from a task
- How to use a Scala class inside Pyspark
- Transforming PySpark RDD with Scala
Related videos on Youtube
prossblad
Updated on June 07, 2022Comments
-
prossblad almost 2 years
Let's say for instance that my team has choosen Python as the reference language to develop with Spark. But later for performance reasons, we would like to develop specific Scala or Java specific librairies in order to map them with our Python code (something similar to Python stubs with Scala or Java skeletons).
Don't you think is it possible to interface new customized Python methods with under the hood some Scala or Java User Defined Functions ?
-
Arnab almost 8 yearsI am doing the following, but every time I encounter "py4j.protocol.Py4JError": com.example.udf.GroupConcat.apply does not exist in the JVM. My package is "com.example.udf"
-
dksahuji over 6 yearsI have a jar which has enum constant and UDF. How to modify this code to use that?
-
MichaelChirico over 5 yearsI'm missing something on how
registerJavaFunction
knows where to find your UDF... Could you elaborate on the directory structure here? Have yousbt clean assembly
yourscalaVersion :=...
(build.sbt
??) andpackage com.example.spark.udfs...
(src/main/scala
??) files from another directory?elsewhere? -
MichaelChirico over 5 yearsIt's worth noting that you should really start by checking repo1.maven.org/maven2/org/apache/spark to be sure your Scala and Spark versions are compatible in the first place... I just spent a whole day (my first day using
sbt
🎉) trying to makescalaVersion := "2.12.7"
work withsparkVersion = "2.3.1"
, but Scala2.12
+ are only compatible with Spark2.4
+ (or so I gather)