Apache Spark -- Assign the result of UDF to multiple dataframe columns

36,955

Solution 1

It is not possible to create multiple top level columns from a single UDF call but you can create a new struct. It requires an UDF with specified returnType:

from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, FloatType

schema = StructType([
    StructField("foo", FloatType(), False),
    StructField("bar", FloatType(), False)
])

def udf_test(n):
    return (n / 2, n % 2) if n and n != 0.0 else (float('nan'), float('nan'))

test_udf = udf(udf_test, schema)
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"])

foobars = df.select(test_udf("y").alias("foobar"))
foobars.printSchema()
## root
##  |-- foobar: struct (nullable = true)
##  |    |-- foo: float (nullable = false)
##  |    |-- bar: float (nullable = false)

You further flatten the schema with simple select:

foobars.select("foobar.foo", "foobar.bar").show()
## +---+---+
## |foo|bar|
## +---+---+
## |1.0|0.0|
## |1.5|1.0|
## +---+---+

See also Derive multiple columns from a single column in a Spark DataFrame

Solution 2

you can use flatMap to get the column the desired dataframe in one go

df=df.withColumn('udf_results',udf)  
df4=df.select('udf_results').rdd.flatMap(lambda x:x).toDF(schema=your_new_schema)
Share:
36,955
Everaldo Aguiar
Author by

Everaldo Aguiar

Updated on August 25, 2021

Comments

  • Everaldo Aguiar
    Everaldo Aguiar over 2 years

    I'm using pyspark, loading a large csv file into a dataframe with spark-csv, and as a pre-processing step I need to apply a variety of operations to the data available in one of the columns (that contains a json string). That will return X values, each of which needs to be stored in their own separate column.

    That functionality will be implemented in a UDF. However, I am not sure how to return a list of values from that UDF and feed these into individual columns. Below is a simple example:

    (...)
    from pyspark.sql.functions import udf
    def udf_test(n):
        return [n/2, n%2]
    
    test_udf=udf(udf_test)
    
    
    df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
    

    That produces the following:

    +------+----------+--------------------+
    |amount|trans_date|                test|
    +------+----------+--------------------+
    |  28.0|2016-02-07|         [14.0, 0.0]|
    | 31.01|2016-02-07|[15.5050001144409...|
    | 13.41|2016-02-04|[6.70499992370605...|
    | 307.7|2015-02-17|[153.850006103515...|
    | 22.09|2016-02-05|[11.0450000762939...|
    +------+----------+--------------------+
    only showing top 5 rows
    

    What would be the best way to store the two (in this example) values being returned by the udf on separate columns? Right now they are being typed as strings:

    df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
    
    root
     |-- amount: float (nullable = true)
     |-- trans_date: string (nullable = true)
     |-- test: string (nullable = true)