"'DataFrame' object has no attribute 'apply'" when trying to apply lambda to create new column

10,572

Solution 1

The syntax you are using is for a pandas DataFrame. To achieve this for a spark DataFrame, you should use the withColumn() method. This works great for a wide range of well defined DataFrame functions, but it's a little more complicated for user defined mapping functions.

General Case

In order to define a udf, you need to specify the output data type. For instance, if you wanted to apply a function my_func that returned a string, you could create a udf as follows:

import pyspark.sql.functions as f
my_udf = f.udf(my_func, StringType())

Then you can use my_udf to create a new column like:

df = df.withColumn('new_column', my_udf(f.col("some_column_name")))

Another option is to use select:

df = df.select("*", my_udf(f.col("some_column_name")).alias("new_column"))

Specific Problem

Using a udf

In your specific case, you want to use a dictionary to translate the values of your DataFrame.

Here is a way to define a udf for this purpose:

some_map_udf = f.udf(lambda x: some_map.get(x, None), IntegerType())

Notice that I used dict.get() because you want your udf to be robust to bad inputs.

df = df.withColumn('new_column', some_map_udf(f.col("some_column_name")))

Using DataFrame functions

Sometimes using a udf is unavoidable, but whenever possible, using DataFrame functions is usually preferred.

Here is one option to do the same thing without using the udf.

The trick is to iterate over the items in some_map to create a list of pyspark.sql.functions.when() functions.

some_map_func = [f.when(f.col("some_column_name") == k, v) for k, v in some_map.items()]
print(some_map_func)
#[Column<CASE WHEN (some_column_name = a) THEN 0 END>,
# Column<CASE WHEN (some_column_name = c) THEN 1 END>,
# Column<CASE WHEN (some_column_name = b) THEN 1 END>]

Now you can use pyspark.sql.functions.coalesce() inside of a select:

df = df.select("*", f.coalesce(*some_map_func).alias("some_column_name"))

This works because when() returns null by default if the condition is not met, and coalesce() will pick the first non-null value it encounters. Since the keys of the map are unique, at most one column will be non-null.

Solution 2

You have a spark dataframe, not a pandas dataframe. To add new column to the spark dataframe:

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
df = df.withColumn('new_column', F.udf(some_map.get, IntegerType())(some_column_name))
df.show()
Share:
10,572
Pierre-Antoine
Author by

Pierre-Antoine

Java Programmer, and occasional teacher. Main other language: Python IDEs: IntelliJ, PyCharm &amp; Vim. Main interest: machine learning, deep learning, computer vision, computer imagerie, cryptography, networking, mathematics and statistics. (My views are my own and not the one of my employer)

Updated on June 06, 2022

Comments

  • Pierre-Antoine
    Pierre-Antoine almost 2 years

    I aim at adding a new column in a Pandas DataFrame, but I am facing an weird error.

    The new column is expected to be a transformation from an existing column, that can be done doing a lookup in a dictionary/hashmap.

    # Loading data
    df = sqlContext.read.format(...).load(train_df_path)
    
    # Instanciating the map
    some_map = {
        'a': 0, 
        'b': 1,
        'c': 1,
    }
    
    # Creating a new column using the map
    df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)
    

    Which leads to the following error:

    AttributeErrorTraceback (most recent call last)
    <ipython-input-12-aeee412b10bf> in <module>()
         25 df= train_df
         26 
    ---> 27 df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)
    
    /usr/lib/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
        962         if name not in self.columns:
        963             raise AttributeError(
    --> 964                 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
        965         jc = self._jdf.apply(name)
        966         return Column(jc)
    
    AttributeError: 'DataFrame' object has no attribute 'apply'
    

    Other potentially useful info: * I am using Spark and Python 2.