How can I use a function in dataframe withColumn function in Pyspark?
Your function_definition(valor,atributo) returns a single String (valor_generalizado) for a single valor.
AssertionError: col should be Column means that you are passing an argument to WithColumn(colName,col) that is not a Column. So you have to transform your data, in order to have Column, for example as you can see below.
Dataframe for example (same structure as yours):
a = [(10.0,1.2),(73.0,4.0)] # like your dataframe, this is only an example
dataframe = spark.createDataFrame(a,["tp", "S"]) # tp and S are random names for these columns
dataframe.show()
+----+---+
| tp| S|
+----+---+
|10.0|1.2|
|73.0|4.0|
+----+---+
As you can see here
udf Creates a Column expression representing a user defined function (UDF).
Solution:
from pyspark.sql.functions import udf
attr = 'TEMP'
udf_func = udf(lambda x: function_definition(x,attr),returnType=StringType())
dataframe_new = dataframe.withColumn("newCol",udf_func(dataframe.tp))
dataframe_new.show()
+----+---+----------+
| tp| S| newCol|
+----+---+----------+
|10.0|1.2| Low|
|73.0|4.0|Normal-Low|
+----+---+----------+
jartymcfly
Updated on June 15, 2022Comments
-
jartymcfly almost 2 years
I have the some dictionaries and a function defined:
dict_TEMPERATURE = {(0, 70): 'Low', (70.01, 73.99): 'Normal-Low',(74, 76): 'Normal', (76.01, 80): 'Normal-High', (80.01, 300): 'High'} ... hierarchy_dict = {'TEMP': dict_TEMPERATURE, 'PRESS': dict_PRESSURE, 'SH_SP': dict_SHAFT_SPEED, 'POI': dict_POI, 'TRIG': dict_TRIGGER} def function_definition(valor, atributo): dict_atributo = hierarchy_dict[atributo] valor_generalizado = None if isinstance(valor, (int, long, float, complex)): for key, value in dict_atributo.items(): if(isinstance(key, tuple)): lista = list(key) if (valor > key[0] and valor < key[1]): valor_generalizado = value else: # if it is not numeric valor_generalizado = dict_atributo.get(valor) return valor_generalizado
What this function basically do is: check the value which is passed as an argument to the "function_definition" function, and replace its value according to its dictionary's references.
So, if I call "function_definition(60, 'TEMP')" it will return 'LOW'.
On the other hand, I have a dataframe with the next structure (this is an example):
+----+-----+-----+---+----+ |TEMP|SH_SP|PRESS|POI|TRIG| +----+-----+-----+---+----+ | 0| 1| 2| 0| 0| | 0| 2| 3| 1| 1| | 0| 3| 4| 2| 1| | 0| 4| 5| 3| 1| | 0| 5| 6| 4| 1| | 0| 1| 2| 5| 1| +----+-----+-----+---+----+
What I want to do is to replace the values of one column of the dataframe based on the function defined above, so I have the next code-line:
dataframe_new = dataframe.withColumn(atribute_name, function_definition(dataframe[atribute_name], atribute_name))
But I get the next error message when executing it:
AssertionError: col should be Column
What is wrong in my code? How could do that?