outlier detection in pyspark

11,030

You can use pyspark.sql.DataFrame.approxQuantile inside of a loop to get the desired 25th and 75th percentile values for each of your columns.

bounds = {
    c: dict(
        zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in df.columns
}

The last argument passed is the relative error, which you can read about on the linked post as well as on the docs. The short version is that the lower the number, the more accurate your result will be but there is a trade-off between accuracy and computational expense. (Here I used 0 to get the exact value, but you may want to choose a different value based on the size of your data.)

Once you have the first and third quartile values, you can compute the iqr and upper/lower bounds quite easily:

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)
#{'age': {'lower': 3.0, 'q1': 33.0, 'q3': 53.0, 'upper': 83.0},
# 'balance': {'lower': -570.0, 'q1': 6.0, 'q3': 390.0, 'upper': 966.0},
# 'duration': {'lower': -143.0, 'q1': 76.0, 'q3': 222.0, 'upper': 441.0}}

Now use pyspark.sql.functions.when in a list comprehension to build the outlier columns based on bounds:

import pyspark.sql.functions as f
df.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in df.columns
    ]
).show()
#+---+-------+--------+-------+-----------+------------+
#|age|balance|duration|age_out|balance_out|duration_out|
#+---+-------+--------+-------+-----------+------------+
#|  2|   2143|     261|      1|          1|           0|
#| 44|     29|     151|      0|          0|           0|
#| 33|      2|      76|      0|          0|           0|
#| 50|   1506|      92|      0|          1|           0|
#| 33|      1|     198|      0|          0|           0|
#| 35|    231|     139|      0|          0|           0|
#| 28|    447|     217|      0|          0|           0|
#|  2|      2|     380|      1|          0|           0|
#| 58|    121|      50|      0|          0|           0|
#| 43|    693|      55|      0|          0|           0|
#| 41|    270|     222|      0|          0|           0|
#| 50|    390|     137|      0|          0|           0|
#| 53|      6|     517|      0|          0|           1|
#| 58|     71|      71|      0|          0|           0|
#| 57|    162|     174|      0|          0|           0|
#| 40|    229|     353|      0|          0|           0|
#| 45|     13|      98|      0|          0|           0|
#| 57|     52|      38|      0|          0|           0|
#|  3|      0|     219|      0|          0|           0|
#|  4|      0|      54|      0|          0|           0|
#+---+-------+--------+-------+-----------+------------+

Here I used between to check if a value is not an outlier, and this function is inclusive (ie x between a and b is logically equivalent to x >= a and x <= b).

Share:
11,030
RSK
Author by

RSK

Updated on June 26, 2022

Comments

  • RSK
    RSK almost 2 years

    I have a pyspark data frame as shown below.

    +---+-------+--------+
    |age|balance|duration|
    +---+-------+--------+
    |  2|   2143|     261|
    | 44|     29|     151|
    | 33|      2|      76|
    | 50|   1506|      92|
    | 33|      1|     198|
    | 35|    231|     139|
    | 28|    447|     217|
    |  2|      2|     380|
    | 58|    121|      50|
    | 43|    693|      55|
    | 41|    270|     222|
    | 50|    390|     137|
    | 53|      6|     517|
    | 58|     71|      71|
    | 57|    162|     174|
    | 40|    229|     353|
    | 45|     13|      98|
    | 57|     52|      38|
    |  3|      0|     219|
    |  4|      0|      54|
    +---+-------+--------+
    

    and my expected output should be look like,

    +---+-------+--------+-------+-----------+------------+
    |age|balance|duration|age_out|balance_out|duration_out|
    +---+-------+--------+-------+-----------+------------+
    |  2|   2143|     261|      1|          1|           0|
    | 44|     29|     151|      0|          0|           0|
    | 33|      2|      76|      0|          0|           0|
    | 50|   1506|      92|      0|          1|           0|
    | 33|      1|     198|      0|          0|           0|
    | 35|    231|     139|      0|          0|           0|
    | 28|    447|     217|      0|          0|           0|
    |  2|      2|     380|      1|          0|           0|
    | 58|    121|      50|      0|          0|           0|
    | 43|    693|      55|      0|          0|           0|
    | 41|    270|     222|      0|          0|           0|
    | 50|    390|     137|      0|          0|           0|
    | 53|      6|     517|      0|          0|           1|
    | 58|     71|      71|      0|          0|           0|
    | 57|    162|     174|      0|          0|           0|
    | 40|    229|     353|      0|          0|           0|
    | 45|     13|      98|      0|          0|           0|
    | 57|     52|      38|      0|          0|           0|
    |  3|      0|     219|      1|          0|           0|
    |  4|      0|      54|      0|          0|           0|
    +---+-------+--------+-------+-----------+------------+
    

    Here my objective is to identify the outlier records in the data set by using inter quartile method as I described in the below python code. If we find any outlier records, then we need to flag them as 1 otherwise 0.

    I can do the same thing using python by using below code.

    import numpy as np
    def outliers_iqr(ys):
        quartile_1, quartile_3 = np.percentile(ys, [25, 75])
        iqr = quartile_3 - quartile_1
        lower_bound = quartile_1 - (iqr * 1.5)
        upper_bound = quartile_3 + (iqr * 1.5)
        ser = np.zeros(len(ys))
        pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
        ser[pos]=1
        return(ser)
    

    But I wanted to do the same thing in pyspark. Can someone help me on the same?

    my pyspark code:

    def outliers_iqr(ys):
        quartile_1, quartile_3 = np.percentile(ys, [25, 75])
        iqr = quartile_3 - quartile_1
        lower_bound = quartile_1 - (iqr * 1.5)
        upper_bound = quartile_3 + (iqr * 1.5)
        ser = np.zeros(len(ys))
        pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
        ser[pos]=1
        return(float(ser))
    
    outliers_iqr_udf = udf(outliers_iqr, FloatType())
    DF.withColumn('age_out', outliers_iqr_udf(DF.select('age').collect())).show()