Fill Pyspark dataframe column null values with average value from same column

17,250

Well, one way or another you have to:

  • compute statistics
  • fill the blanks

It pretty much limits what you can really improve here, still:

  • replace flatMap(list).collect()[0] with first()[0] or structure unpacking
  • compute all stats with a single action
  • use built-in Row methods to extract dictionary

The final result could like this:

def fill_with_mean(df, exclude=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c not in exclude
    ))
    return df.na.fill(stats.first().asDict())

fill_with_mean(df_data, ["id", "date"])

In Spark 2.2 or later you can also use Imputer. See Replace missing values with mean - Spark Dataframe.

Share:
17,250
Ivan
Author by

Ivan

Data Scientist, Systems and Big Data Architect, Physicist

Updated on June 07, 2022

Comments

  • Ivan
    Ivan almost 2 years

    With a dataframe like this,

    rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,None,"201602"),(1,20,3003,"201601"), (1,20,None,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, None,"201601")])
    
    df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"])
    df_data.show()
    
    +---+----+----+-------+
    | id|type|cost|   date|
    +---+----+----+-------+
    |  0|  10| 223| 201601|
    |  0|  10|  83|2016032|
    |  1|  20|null| 201602|
    |  1|  20|3003| 201601|
    |  1|  20|null| 201603|
    |  2|  40|2321| 201601|
    |  2|  30|  10| 201602|
    |  2|  61|null| 201601|
    +---+----+----+-------+
    

    I need to fill the null values with the average of the existing values, with the expected result being

    +---+----+----+-------+
    | id|type|cost|   date|
    +---+----+----+-------+
    |  0|  10| 223| 201601|
    |  0|  10|  83|2016032|
    |  1|  20|1128| 201602|
    |  1|  20|3003| 201601|
    |  1|  20|1128| 201603|
    |  2|  40|2321| 201601|
    |  2|  30|  10| 201602|
    |  2|  61|1128| 201601|
    +---+----+----+-------+
    

    where 1128 is the average of the existing values. I need to do that for several columns.

    My current approach is to use na.fill:

    fill_values = {column: df_data.agg({column:"mean"}).flatMap(list).collect()[0] for column in df_data.columns if column not in ['date','id']}
    df_data = df_data.na.fill(fill_values)
    
    +---+----+----+-------+
    | id|type|cost|   date|
    +---+----+----+-------+
    |  0|  10| 223| 201601|
    |  0|  10|  83|2016032|
    |  1|  20|1128| 201602|
    |  1|  20|3003| 201601|
    |  1|  20|1128| 201603|
    |  2|  40|2321| 201601|
    |  2|  30|  10| 201602|
    |  2|  61|1128| 201601|
    +---+----+----+-------+
    

    But this is very cumbersome. Any ideas?

  • marilena.oita
    marilena.oita almost 7 years
    @Kevad do import pyspark.sql.functions as fn, then use fn.avg
  • Nico Coallier
    Nico Coallier almost 7 years
    Did someone improve this function ? Take way too much computing time on my side !? :)
  • zero323
    zero323 almost 7 years
    @NicoCoallier Performance-wise you won't get any significant improvement. This is pretty much optimal solution. API-wise you can use Imputer.
  • Nico Coallier
    Nico Coallier almost 7 years
    Is it faster to exclude or include ?
  • zero323
    zero323 almost 7 years
    @NicoCoallier This has negligible impact, but If you're doing this on large number of columns (like thousands) , optimizer overhead alone is significant. If this is the case RDD might be faster. Of course, a lot depends on what you mean by slow...
  • Nico Coallier
    Nico Coallier almost 7 years
    My master keep crash at this line , however when I reduce the number of data it works
  • Nico Coallier
    Nico Coallier almost 7 years
    Slow ~ more than an hour
  • zero323
    zero323 almost 7 years
    @NicoCoallier Well, then try to determine the bottleneck. If the cluster is mostly idle then it is likely to be planner. In case like this you can try with RDDs, NumPy and StatCounter. If the tasks are slow, then it is either configuration problem, or lack of resources.
  • Nico Coallier
    Nico Coallier almost 7 years
    When I increase the worker or the partitioning it get slower
  • Nico Coallier
    Nico Coallier almost 7 years
    Do you have an example from DF to RDD, fillna and then back to DF ?
  • Nico Coallier
    Nico Coallier almost 7 years
    Is there a way without udf ?
  • zero323
    zero323 almost 7 years
    @NicoCoallier You can try something around these lines stackoverflow.com/a/35585532/1560062 UDF is not applicable here at all.