Spark: How to map Python with Scala or Java User Defined Functions?

17,353

Spark 2.1+

You can use SQLContext.registerJavaFunction:

Register a java UDF so it can be used in SQL statements.

which requires a name, fully qualified name of Java class, and optional return type. Unfortunately for now it can be used only in SQL statements (or with expr / selectExpr) and requires a Java org.apache.spark.sql.api.java.UDF*:

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.1.0"
)
package com.example.spark.udfs

import org.apache.spark.sql.api.java.UDF1

class addOne extends UDF1[Integer, Integer] {
  def call(x: Integer) = x + 1
} 
sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne")
sqlContext.sql("SELECT add_one(1)").show()

## +------+
## |UDF(1)|
## +------+
## |     2|
## +------+

Version indpendent:

I wouldn't go so far as to say it is supported but it is certainly possible. All SQL functions available currently in PySpark are simply a wrappers around Scala API.

Lets assume I want to reuse GroupConcat UDAF I've created as an answer to SPARK SQL replacement for mysql GROUP_CONCAT aggregate function and it is located in a package com.example.udaf:

from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql import Row

row = Row("k", "v")
df = sc.parallelize([
    row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF()

def groupConcat(col):
    """Group and concatenate values for a given column

    >>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
    >>> df.select(groupConcat("v").alias("vs"))
    [Row(vs=u'foo,bar')]
    """
    sc = SparkContext._active_spark_context
    # It is possible to use java_import to avoid full package path
    _groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply
    # Converting to Seq to match apply(exprs: Column*)
    return Column(_groupConcat(_to_seq(sc, [col], _to_java_column)))

df.groupBy("k").agg(groupConcat("v").alias("vs")).show()

## +---+---------+
## |  k|       vs|
## +---+---------+
## |  1|foo1,foo2|
## |  2|bar1,bar2|
## +---+---------+

There is far too much leading underscores for my taste but as you can see it can be done.

Related to:

Share:
17,353

Related videos on Youtube

prossblad
Author by

prossblad

Updated on June 07, 2022

Comments

  • prossblad
    prossblad almost 2 years

    Let's say for instance that my team has choosen Python as the reference language to develop with Spark. But later for performance reasons, we would like to develop specific Scala or Java specific librairies in order to map them with our Python code (something similar to Python stubs with Scala or Java skeletons).

    Don't you think is it possible to interface new customized Python methods with under the hood some Scala or Java User Defined Functions ?

  • Arnab
    Arnab almost 8 years
    I am doing the following, but every time I encounter "py4j.protocol.Py4JError": com.example.udf.GroupConcat.apply does not exist in the JVM. My package is "com.example.udf"
  • dksahuji
    dksahuji over 6 years
    I have a jar which has enum constant and UDF. How to modify this code to use that?
  • MichaelChirico
    MichaelChirico over 5 years
    I'm missing something on how registerJavaFunction knows where to find your UDF... Could you elaborate on the directory structure here? Have you sbt clean assembly your scalaVersion :=... (build.sbt??) and package com.example.spark.udfs... (src/main/scala??) files from another directory?elsewhere?
  • MichaelChirico
    MichaelChirico over 5 years
    It's worth noting that you should really start by checking repo1.maven.org/maven2/org/apache/spark to be sure your Scala and Spark versions are compatible in the first place... I just spent a whole day (my first day using sbt 🎉) trying to make scalaVersion := "2.12.7" work with sparkVersion = "2.3.1", but Scala 2.12+ are only compatible with Spark 2.4+ (or so I gather)