PySpark: calculate mean, standard deviation and those values around the mean in one step
Solution 1
The solution is to use the DataFrame.aggregateByKey function that aggregates the values per partition and node before shuffling that aggregate around the computing nodes where they are combined to one resulting value.
Pseudo-code looks like this. It is inspired by this tutorial, but it uses two instances of the StatCounter though we are summarizing two different statistics at once:
from pyspark.statcounter import StatCounter
# value[0] is the timestamp and value[1] is the float-value
# we are using two instances of StatCounter to sum-up two different statistics
def mergeValues(s1, v1, s2, v2):
s1.merge(v1)
s2.merge(v2)
return
def combineStats(s1, s2):
s1[0].mergeStats(s2[0])
s1[1].mergeStats(s2[1])
return
(df.aggregateByKey((StatCounter(), StatCounter()),
(lambda s, values: mergeValues(s[0], values[0], s[1], values[1]),
(lambda s1, s2: combineStats(s1, s2))
.mapValues(lambda s: ( s[0].min(), s[0].max(), s[1].max(), s[1].min(), s[1].mean(), s[1].variance(), s[1].stddev,() s[1].count()))
.collect())
Solution 2
This cannot work because when you execute
from pyspark.sql.functions import *
you shadow built-in abs
with pyspark.sql.functions.abs
which expects a column not a local Python value as an input.
Also UDF you created doesn't handle NULL
entries.
-
Don't use
import *
unless you're aware of what exactly is imported. Instead aliasfrom pyspark.sql.functions import abs as abs_
or import module
from pyspark.sql import functions as sqlf sqlf.col("x")
Always check input inside UDF or even better avoid UDFs unless necessary.
Matthias
Instrumental in successful development of GUI components for client-server multi-user systems. Deployed advanced technologies such as UML, Microsoft .NET, OpenGL/DirectX and Microsoft SQL Server.
Updated on August 06, 2022Comments
-
Matthias over 1 year
My raw data comes in a tabular format. It contains observations from different variables. Each observation with the variable name, the timestamp and the value at that time.
Variable [string], Time [datetime], Value [float]
The data is stored as Parquet in HDFS and loaded into a Spark Dataframe (df). From that dataframe.
Now I want to calculate default statistics like Mean, Standard Deviation and others for each variable. Afterwards, once the Mean has been retrieved, I want to filter/count those values for that variable that are closely around the Mean.
Due to the answer towards my other question, I came up with this code:
from pyspark.sql.window import Window from pyspark.sql.functions import * from pyspark.sql.types import * w1 = Window().partitionBy("Variable") w2 = Window.partitionBy("Variable").orderBy("Time") def stddev_pop_w(col, w): #Built-in stddev doesn't support windowing return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2)) def isInRange(value, mean, stddev, radius): try: if (abs(value - mean) < radius * stddev): return 1 else: return 0 except AttributeError: return -1 delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long") #f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType()) #f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType()) #f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType()) df_ = df_all \ .withColumn("mean", mean("Value").over(w1)) \ .withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \ .withColumn("delta", delta) \ # .withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \ # .withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \ #df2.show(5, False)
Question: The last two commented-lines won't work. It will give an AttributeError because the incoming values for stddev and mean are null. I guess this happens because I'm referring to columns that are also just calculated on the fly and have no value at that moment. But is there a way to achieve that?
Currently I'm doing a second run like this:
df = df_.select("*", \ abs(df_.Value - df_.mean).alias("max_deviation_mean"), \ when(abs(df_.Value - df_.mean) < 2 * df_.std_deviation, 1).otherwise(1).alias("std_dev_mean_2"), \ when(abs(df_.Value - df_.mean) < 3 * df_.std_deviation, 1).otherwise(1).alias("std_dev_mean_3"))