"'DataFrame' object has no attribute 'apply'" when trying to apply lambda to create new column
Solution 1
The syntax you are using is for a pandas
DataFrame. To achieve this for a spark
DataFrame, you should use the withColumn()
method. This works great for a wide range of well defined DataFrame functions, but it's a little more complicated for user defined mapping functions.
General Case
In order to define a udf
, you need to specify the output data type. For instance, if you wanted to apply a function my_func
that returned a string
, you could create a udf
as follows:
import pyspark.sql.functions as f
my_udf = f.udf(my_func, StringType())
Then you can use my_udf
to create a new column like:
df = df.withColumn('new_column', my_udf(f.col("some_column_name")))
Another option is to use select
:
df = df.select("*", my_udf(f.col("some_column_name")).alias("new_column"))
Specific Problem
Using a udf
In your specific case, you want to use a dictionary to translate the values of your DataFrame.
Here is a way to define a udf
for this purpose:
some_map_udf = f.udf(lambda x: some_map.get(x, None), IntegerType())
Notice that I used dict.get()
because you want your udf
to be robust to bad inputs.
df = df.withColumn('new_column', some_map_udf(f.col("some_column_name")))
Using DataFrame functions
Sometimes using a udf
is unavoidable, but whenever possible, using DataFrame functions is usually preferred.
Here is one option to do the same thing without using the udf
.
The trick is to iterate over the items in some_map
to create a list of pyspark.sql.functions.when()
functions.
some_map_func = [f.when(f.col("some_column_name") == k, v) for k, v in some_map.items()]
print(some_map_func)
#[Column<CASE WHEN (some_column_name = a) THEN 0 END>,
# Column<CASE WHEN (some_column_name = c) THEN 1 END>,
# Column<CASE WHEN (some_column_name = b) THEN 1 END>]
Now you can use pyspark.sql.functions.coalesce()
inside of a select:
df = df.select("*", f.coalesce(*some_map_func).alias("some_column_name"))
This works because when()
returns null
by default if the condition is not met, and coalesce()
will pick the first non-null value it encounters. Since the keys of the map are unique, at most one column will be non-null.
Solution 2
You have a spark dataframe, not a pandas dataframe. To add new column to the spark dataframe:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
df = df.withColumn('new_column', F.udf(some_map.get, IntegerType())(some_column_name))
df.show()
Pierre-Antoine
Java Programmer, and occasional teacher. Main other language: Python IDEs: IntelliJ, PyCharm & Vim. Main interest: machine learning, deep learning, computer vision, computer imagerie, cryptography, networking, mathematics and statistics. (My views are my own and not the one of my employer)
Updated on June 06, 2022Comments
-
Pierre-Antoine almost 2 years
I aim at adding a new column in a Pandas DataFrame, but I am facing an weird error.
The new column is expected to be a transformation from an existing column, that can be done doing a lookup in a dictionary/hashmap.
# Loading data df = sqlContext.read.format(...).load(train_df_path) # Instanciating the map some_map = { 'a': 0, 'b': 1, 'c': 1, } # Creating a new column using the map df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)
Which leads to the following error:
AttributeErrorTraceback (most recent call last) <ipython-input-12-aeee412b10bf> in <module>() 25 df= train_df 26 ---> 27 df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1) /usr/lib/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name) 962 if name not in self.columns: 963 raise AttributeError( --> 964 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) 965 jc = self._jdf.apply(name) 966 return Column(jc) AttributeError: 'DataFrame' object has no attribute 'apply'
Other potentially useful info: * I am using Spark and Python 2.