How can I use a function in dataframe withColumn function in Pyspark?

10,075

Your function_definition(valor,atributo) returns a single String (valor_generalizado) for a single valor.

AssertionError: col should be Column means that you are passing an argument to WithColumn(colName,col) that is not a Column. So you have to transform your data, in order to have Column, for example as you can see below.

Dataframe for example (same structure as yours):

a = [(10.0,1.2),(73.0,4.0)] # like your dataframe, this is only an example

dataframe = spark.createDataFrame(a,["tp", "S"]) # tp and S are random names for these columns

dataframe.show()
+----+---+
|  tp|  S|
+----+---+
|10.0|1.2|
|73.0|4.0|
+----+---+

As you can see here

udf Creates a Column expression representing a user defined function (UDF).

Solution:

from pyspark.sql.functions import udf

attr = 'TEMP'
udf_func = udf(lambda x: function_definition(x,attr),returnType=StringType())

dataframe_new = dataframe.withColumn("newCol",udf_func(dataframe.tp))
dataframe_new.show()

+----+---+----------+
|  tp|  S|    newCol|
+----+---+----------+
|10.0|1.2|       Low|
|73.0|4.0|Normal-Low|
+----+---+----------+
Share:
10,075
jartymcfly
Author by

jartymcfly

Updated on June 15, 2022

Comments

  • jartymcfly
    jartymcfly almost 2 years

    I have the some dictionaries and a function defined:

    dict_TEMPERATURE = {(0, 70): 'Low', (70.01, 73.99): 'Normal-Low',(74, 76): 'Normal', (76.01, 80): 'Normal-High', (80.01, 300): 'High'}
    ...
    hierarchy_dict = {'TEMP': dict_TEMPERATURE, 'PRESS': dict_PRESSURE, 'SH_SP': dict_SHAFT_SPEED, 'POI': dict_POI, 'TRIG': dict_TRIGGER}
    
    
    
    def function_definition(valor, atributo):
    
        dict_atributo = hierarchy_dict[atributo]
        valor_generalizado = None
    
        if isinstance(valor, (int, long, float, complex)):
    
            for key, value in dict_atributo.items():
    
                if(isinstance(key, tuple)):
                    lista = list(key)
    
                    if (valor > key[0] and valor < key[1]):
                        valor_generalizado = value
    
        else: # if it is not numeric
            valor_generalizado = dict_atributo.get(valor)
    
    
        return valor_generalizado
    

    What this function basically do is: check the value which is passed as an argument to the "function_definition" function, and replace its value according to its dictionary's references.

    So, if I call "function_definition(60, 'TEMP')" it will return 'LOW'.

    On the other hand, I have a dataframe with the next structure (this is an example):

    +----+-----+-----+---+----+
    |TEMP|SH_SP|PRESS|POI|TRIG|
    +----+-----+-----+---+----+
    |   0|    1|    2|  0|   0|
    |   0|    2|    3|  1|   1|
    |   0|    3|    4|  2|   1|
    |   0|    4|    5|  3|   1|
    |   0|    5|    6|  4|   1|
    |   0|    1|    2|  5|   1|
    +----+-----+-----+---+----+
    

    What I want to do is to replace the values of one column of the dataframe based on the function defined above, so I have the next code-line:

    dataframe_new = dataframe.withColumn(atribute_name, function_definition(dataframe[atribute_name], atribute_name))
    

    But I get the next error message when executing it:

    AssertionError: col should be Column
    

    What is wrong in my code? How could do that?