Pyspark : forward fill with last observation for a DataFrame

20,454

Solution 1

Another workaround to get this working, is to try something like this:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

window = (
    Window
    .partitionBy('cookie_id')
    .orderBy('Time')
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

final = (
    joined
    .withColumn('UserIDFilled', F.last('User_ID', ignorenulls=True).over(window))
)

So what this is doing is that it constructs your window based on the partition key and the order column. It also tells the window to look back all rows within the window up to the current row. Finally, at each row, you return the last value that is not null (which remember, according to your window, it includes your current row)

Solution 2

Hope you find this forward fill function useful. It is written using native pyspark function. Neither udf nor rdd being used (both of them are very slow, especially UDF!).

Let's use example provided by @Sid.

values = [
    (1, "2015-12-01", None),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-03", "U2"),
    (1, "2015-12-04", None),
    (1, "2015-12-05", None),
    (2, "2015-12-04", None),
    (2, "2015-12-03", None),
    (2, "2015-12-02", "U3"),
    (2, "2015-12-05", None),
] 

df = spark.createDataFrame(values, ['cookie_ID', 'Time', 'User_ID'])

enter image description here

Functions:

def cum_sum(df, sum_col , order_col, cum_sum_col_nm='cum_sum'):  
    '''Find cumulative sum of a column. 
    Parameters 
    -----------
    sum_col : String 
        Column to perform cumulative sum. 
    order_col : List 
        Column/columns to sort for cumulative sum. 
    cum_sum_col_nm : String
        The name of the resulting cum_sum column. 

    Return
    -------
    df : DataFrame
        Dataframe with additional "cum_sum_col_nm". 

    '''
    df = df.withColumn('tmp', lit('tmp')) 

    windowval = (Window.partitionBy('tmp') 
                 .orderBy(order_col)
                 .rangeBetween(Window.unboundedPreceding, 0)) 

    df = df.withColumn('cum_sum', sum(sum_col).over(windowval).alias('cumsum').cast(StringType()))
    df = df.drop('tmp') 
    return df   


def forward_fill(df, order_col, fill_col, fill_col_name=None):
    '''Forward fill a column by a column/set of columns (order_col).  
    Parameters:
    ------------
    df: Dataframe 
    order_col: String or List of string
    fill_col: String (Only work for a column for this version.) 

    Return:
    ---------
    df: Dataframe 
        Return df with the filled_cols. 
    '''

    # "value" and "constant" are tmp columns created ton enable forward fill. 
    df = df.withColumn('value', when(col(fill_col).isNull(), 0).otherwise(1))
    df = cum_sum(df, 'value', order_col).drop('value')  
    df = df.withColumn(fill_col, 
                when(col(fill_col).isNull(), 'constant').otherwise(col(fill_col))) 

    win = (Window.partitionBy('cum_sum') 
              .orderBy(order_col)) 

    if not fill_col_name:
        fill_col_name = 'ffill_{}'.format(fill_col)

    df = df.withColumn(fill_col_name, collect_list(fill_col).over(win)[0])
    df = df.drop('cum_sum')
    df = df.withColumn(fill_col_name, when(col(fill_col_name)=='constant', None).otherwise(col(fill_col_name)))
    df = df.withColumn(fill_col, when(col(fill_col)=='constant', None).otherwise(col(fill_col)))
    return df   

Let's see the results.

ffilled_df = forward_fill(df, 
                          order_col=['cookie_ID', 'Time'], 
                          fill_col='User_ID', 
                          fill_col_name = 'User_ID_ffil')
ffilled_df.sort(['cookie_ID', 'Time']).show()   

enter image description here

Solution 3

The partitioned example code from Spark / Scala: forward fill with last observation in pyspark is shown. This only works for data that can be partitioned.

Load the data

values = [
    (1, "2015-12-01", None),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-03", "U2"),
    (1, "2015-12-04", None),
    (1, "2015-12-05", None),
    (2, "2015-12-04", None),
    (2, "2015-12-03", None),
    (2, "2015-12-02", "U3"),
    (2, "2015-12-05", None),
]
rdd = sc.parallelize(values)
df = rdd.toDF(["cookie_id", "c_date", "user_id"])
df = df.withColumn("c_date", df.c_date.cast("date"))
df.show()

The DataFrame is

+---------+----------+-------+
|cookie_id|    c_date|user_id|
+---------+----------+-------+
|        1|2015-12-01|   null|
|        1|2015-12-02|     U1|
|        1|2015-12-02|     U1|
|        1|2015-12-03|     U2|
|        1|2015-12-04|   null|
|        1|2015-12-05|   null|
|        2|2015-12-04|   null|
|        2|2015-12-03|   null|
|        2|2015-12-02|     U3|
|        2|2015-12-05|   null|
+---------+----------+-------+

Column used to sort the partitions

# get the sort key
def getKey(item):
    return item.c_date

The fill function. Can be used to fill in multiple columns if necessary.

# fill function
def fill(x):
    out = []
    last_val = None
    for v in x:
        if v["user_id"] is None:
            data = [v["cookie_id"], v["c_date"], last_val]
        else:
            data = [v["cookie_id"], v["c_date"], v["user_id"]]
            last_val = v["user_id"]
        out.append(data)
    return out

Convert to rdd, partition, sort and fill the missing values

# Partition the data
rdd = df.rdd.groupBy(lambda x: x.cookie_id).mapValues(list)
# Sort the data by date
rdd = rdd.mapValues(lambda x: sorted(x, key=getKey))
# fill missing value and flatten
rdd = rdd.mapValues(fill).flatMapValues(lambda x: x)
# discard the key
rdd = rdd.map(lambda v: v[1])

Convert back to DataFrame

df_out = sqlContext.createDataFrame(rdd)
df_out.show()

The output is

+---+----------+----+
| _1|        _2|  _3|
+---+----------+----+
|  1|2015-12-01|null|
|  1|2015-12-02|  U1|
|  1|2015-12-02|  U1|
|  1|2015-12-03|  U2|
|  1|2015-12-04|  U2|
|  1|2015-12-05|  U2|
|  2|2015-12-02|  U3|
|  2|2015-12-03|  U3|
|  2|2015-12-04|  U3|
|  2|2015-12-05|  U3|
+---+----------+----+

Solution 4

// Forward filling
w1 = Window.partitionBy('cookie_id').orderBy('c_date').rowsBetween(Window.unboundedPreceding,0)
w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

//Backward filling
final_df = df.withColumn('UserIDFilled', F.coalesce(F.last('user_id', True).over(w1),
                                                    F.first('user_id',True).over(w2)))

final_df.orderBy('cookie_id', 'c_date').show(truncate=False)

   +---------+----------+-------+------------+
|cookie_id|c_date    |user_id|UserIDFilled|
+---------+----------+-------+------------+
|1        |2015-12-01|null   |U1          |
|1        |2015-12-02|U1     |U1          |
|1        |2015-12-02|U1     |U1          |
|1        |2015-12-03|U2     |U2          |
|1        |2015-12-04|null   |U2          |
|1        |2015-12-05|null   |U2          |
|2        |2015-12-02|U3     |U3          |
|2        |2015-12-03|null   |U3          |
|2        |2015-12-04|null   |U3          |
|2        |2015-12-05|null   |U3          |
+---------+----------+-------+------------+
Share:
20,454
Villo
Author by

Villo

Updated on July 09, 2022

Comments

  • Villo
    Villo almost 2 years

    Using Spark 1.5.1,

    I've been trying to forward fill null values with the last known observation for one column of my DataFrame.

    It is possible to start with a null value and for this case I would to backward fill this null value with the first knwn observation. However, If that too complicates the code, this point can be skipped.

    In this post, a solution in Scala was provided for a very similar problem by zero323.

    But, I don't know Scala and I don't succeed to ''translate'' it in Pyspark API code. It's possible to do it with Pyspark ?

    Thanks for your help.

    Below, a simple example sample input:

    | cookie_ID     | Time       | User_ID   
    | ------------- | --------   |------------- 
    | 1             | 2015-12-01 | null 
    | 1             | 2015-12-02 | U1
    | 1             | 2015-12-03 | U1
    | 1             | 2015-12-04 | null   
    | 1             | 2015-12-05 | null     
    | 1             | 2015-12-06 | U2
    | 1             | 2015-12-07 | null
    | 1             | 2015-12-08 | U1
    | 1             | 2015-12-09 | null      
    | 2             | 2015-12-03 | null     
    | 2             | 2015-12-04 | U3
    | 2             | 2015-12-05 | null   
    | 2             | 2015-12-06 | U4
    

    And the expected output:

    | cookie_ID     | Time       | User_ID   
    | ------------- | --------   |------------- 
    | 1             | 2015-12-01 | U1
    | 1             | 2015-12-02 | U1
    | 1             | 2015-12-03 | U1
    | 1             | 2015-12-04 | U1
    | 1             | 2015-12-05 | U1
    | 1             | 2015-12-06 | U2
    | 1             | 2015-12-07 | U2
    | 1             | 2015-12-08 | U1
    | 1             | 2015-12-09 | U1
    | 2             | 2015-12-03 | U3
    | 2             | 2015-12-04 | U3
    | 2             | 2015-12-05 | U3
    | 2             | 2015-12-06 | U4
    
    • zero323
      zero323 about 8 years
      I am not if I get the logic. Relationship between user and cookie is many to many? Also how do you define the order? Order of rows is not particularly meaningless in Spark SQL (not that it is in any SQLish environment)
    • Villo
      Villo about 8 years
      Sorry, I forgot to include the timestamp in my example (I edit it). I introduce the Cookie_ID variable in the example to show that I have to forward fill null value BY cookie. Thanks for your help.
    • user1624577
      user1624577 almost 8 years
      Did you ever find a solution to this?
  • Romeo Kienzler
    Romeo Kienzler almost 6 years
    here a solution without coding stackoverflow.com/questions/38131982/…
  • Romeo Kienzler
    Romeo Kienzler almost 6 years
    here a solution without coding stackoverflow.com/questions/38131982/…