Pyspark dataframe: Count elements in array or list

15,309

Solution 1

You can explode the array and filter the exploded values for 1. Then groupBy and count:

from pyspark.sql.functions import col, count, explode

df.select("*", explode("list_of_numbers").alias("exploded"))\
    .where(col("exploded") == 1)\
    .groupBy("letter", "list_of_numbers")\
    .agg(count("exploded").alias("ones"))\
    .show()
#+------+---------------+----+
#|letter|list_of_numbers|ones|
#+------+---------------+----+
#|     A|   [3, 1, 2, 3]|   1|
#|     B|   [1, 2, 1, 1]|   3|
#+------+---------------+----+

In order to keep all rows, even when the count is 0, you can convert the exploded column into an indicator variable. Then groupBy and sum.

from pyspark.sql.functions import col, count, explode, sum as sum_

df.select("*", explode("list_of_numbers").alias("exploded"))\
    .withColumn("exploded", (col("exploded") == 1).cast("int"))\
    .groupBy("letter", "list_of_numbers")\
    .agg(sum_("exploded").alias("ones"))\
    .show()

Note, I have imported pyspark.sql.functions.sum as sum_ as to not overwrite the builtin sum function.

Solution 2

Assuming that the length of the list is constant, one way i can think of is,

from operator import add
from functools import reduce
import pyspark.sql.functions as F

df = sql.createDataFrame(
    [
        ['A',[3, 1, 2, 3]],
        ['B',[1, 2, 1, 1]]
    ],      
        ['letter','list_of_numbers'])

expr = reduce(add,[F.when(F.col('list_of_numbers').getItem(x)==1, 1)\
                    .otherwise(0) for x in range(4)])
df = df.withColumn('ones', expr)
df.show()

+------+---------------+----+
|letter|list_of_numbers|ones|
+------+---------------+----+
|     A|   [3, 1, 2, 3]|   1|
|     B|   [1, 2, 1, 1]|   3|
+------+---------------+----+

Solution 3

From pyspark 3+, we can use array transformations.

https://mungingdata.com/spark-3/array-exists-forall-transform-aggregate-zip_with/ https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-array-functions-720b8fbfa729

import pyspark.sql.functions as F

df = spark_session.createDataFrame(
    [
        ['A',[3, 1, 2, 3]],
        ['B',[1, 2, 1, 1]]
    ],      
        ['letter','list_of_numbers'])

df1 = df.selectExpr('*','filter(list_of_numbers, x->x=1) as ones_array')
df2 = df1.selectExpr('*', 'size(ones_array) as ones')
df2.show()

+------+---------------+----------+----+
|letter|list_of_numbers|ones_array|ones|
+------+---------------+----------+----+
|     A|   [3, 1, 2, 3]|       [1]|   1|
|     B|   [1, 2, 1, 1]| [1, 1, 1]|   3|
+------+---------------+----------+----+
Share:
15,309
Ala Tarighati
Author by

Ala Tarighati

I'm here to learn.

Updated on June 09, 2022

Comments

  • Ala Tarighati
    Ala Tarighati almost 2 years

    Let us assume dataframe df as:

    df.show()
    

    Output:

    +------+----------------+
    |letter| list_of_numbers|
    +------+----------------+
    |     A|    [3, 1, 2, 3]|
    |     B|    [1, 2, 1, 1]|
    +------+----------------+
    

    What I want to do is to count number of a specific element in column list_of_numbers. Something like this:

    +------+----------------+----+
    |letter| list_of_numbers|ones|
    +------+----------------+----+
    |     A|    [3, 1, 2, 3]|   1|
    |     B|    [1, 2, 1, 1]|   3|
    +------+----------------+----+
    

    I have so far tried creating udf and it perfectly works, but I'm wondering if I can do it without defining any udf.

  • Ala Tarighati
    Ala Tarighati over 5 years
    Thank you, but they do not necessarily have same size.
  • Ala Tarighati
    Ala Tarighati over 5 years
    Nice! This works, by considering the fact that I should join the result with original df. Otherwise I lose those rows without one :)
  • pault
    pault over 5 years
    @AllaTarighati, I have posted an update for that case so you can avoid a join back with the original df.