Fill Pyspark dataframe column null values with average value from same column
17,250
Well, one way or another you have to:
- compute statistics
- fill the blanks
It pretty much limits what you can really improve here, still:
- replace
flatMap(list).collect()[0]
withfirst()[0]
or structure unpacking - compute all stats with a single action
- use built-in
Row
methods to extract dictionary
The final result could like this:
def fill_with_mean(df, exclude=set()):
stats = df.agg(*(
avg(c).alias(c) for c in df.columns if c not in exclude
))
return df.na.fill(stats.first().asDict())
fill_with_mean(df_data, ["id", "date"])
In Spark 2.2 or later you can also use Imputer
. See Replace missing values with mean - Spark Dataframe.
Comments
-
Ivan almost 2 years
With a dataframe like this,
rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,None,"201602"),(1,20,3003,"201601"), (1,20,None,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, None,"201601")]) df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"]) df_data.show() +---+----+----+-------+ | id|type|cost| date| +---+----+----+-------+ | 0| 10| 223| 201601| | 0| 10| 83|2016032| | 1| 20|null| 201602| | 1| 20|3003| 201601| | 1| 20|null| 201603| | 2| 40|2321| 201601| | 2| 30| 10| 201602| | 2| 61|null| 201601| +---+----+----+-------+
I need to fill the null values with the average of the existing values, with the expected result being
+---+----+----+-------+ | id|type|cost| date| +---+----+----+-------+ | 0| 10| 223| 201601| | 0| 10| 83|2016032| | 1| 20|1128| 201602| | 1| 20|3003| 201601| | 1| 20|1128| 201603| | 2| 40|2321| 201601| | 2| 30| 10| 201602| | 2| 61|1128| 201601| +---+----+----+-------+
where
1128
is the average of the existing values. I need to do that for several columns.My current approach is to use
na.fill
:fill_values = {column: df_data.agg({column:"mean"}).flatMap(list).collect()[0] for column in df_data.columns if column not in ['date','id']} df_data = df_data.na.fill(fill_values) +---+----+----+-------+ | id|type|cost| date| +---+----+----+-------+ | 0| 10| 223| 201601| | 0| 10| 83|2016032| | 1| 20|1128| 201602| | 1| 20|3003| 201601| | 1| 20|1128| 201603| | 2| 40|2321| 201601| | 2| 30| 10| 201602| | 2| 61|1128| 201601| +---+----+----+-------+
But this is very cumbersome. Any ideas?
-
marilena.oita almost 7 years@Kevad do import pyspark.sql.functions as fn, then use fn.avg
-
Nico Coallier almost 7 yearsDid someone improve this function ? Take way too much computing time on my side !? :)
-
zero323 almost 7 years@NicoCoallier Performance-wise you won't get any significant improvement. This is pretty much optimal solution. API-wise you can use
Imputer
. -
Nico Coallier almost 7 yearsIs it faster to exclude or include ?
-
zero323 almost 7 years@NicoCoallier This has negligible impact, but If you're doing this on large number of columns (like thousands) , optimizer overhead alone is significant. If this is the case
RDD
might be faster. Of course, a lot depends on what you mean by slow... -
Nico Coallier almost 7 yearsMy master keep crash at this line , however when I reduce the number of data it works
-
Nico Coallier almost 7 yearsSlow ~ more than an hour
-
zero323 almost 7 years@NicoCoallier Well, then try to determine the bottleneck. If the cluster is mostly idle then it is likely to be planner. In case like this you can try with
RDDs
, NumPy andStatCounter
. If the tasks are slow, then it is either configuration problem, or lack of resources. -
Nico Coallier almost 7 yearsWhen I increase the worker or the partitioning it get slower
-
Nico Coallier almost 7 yearsDo you have an example from DF to RDD, fillna and then back to DF ?
-
Nico Coallier almost 7 yearsIs there a way without udf ?
-
zero323 almost 7 years@NicoCoallier You can try something around these lines stackoverflow.com/a/35585532/1560062 UDF is not applicable here at all.