How to check if array column is inside another column array in PySpark dataframe

12,916

Solution 1

Here's an option using a udf, where we check the length of the difference between the columns ev and ev2. When the length of the resulting array is 0 , or all elements of ev are contained within ev2, we return True; otherwise False.

def contains(x,y):
  z = len(set(x) - set(y))
  if z == 0:
    return True
  else:
    return False

contains_udf = udf(contains)
df.withColumn("evInEv2", contains_udf(df.ev,df.ev2)).show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|   true|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+

Solution 2

One more implementation for Spark >= 2.4.0 avoiding UDF and using the built-in array_except:

from pyspark.sql.functions import size, array_except

def is_subset(a, b):
  return size(array_except(a, b)) == 0
  
df.withColumn("is_subset", is_subset(df.ev, df.ev2))

Output:

+---+------------+------------+---------+
| id|          ev|         ev2|is_subset|
+---+------------+------------+---------+
|se1|[ev11, ev12]|      [ev11]|    false|
|se2|      [ev11]|[ev11, ev12]|     true|
|se3|      [ev21]|[ev11, ev12]|    false|
|se4|[ev21, ev22]|[ev21, ev22]|     true|
+---+------------+------------+---------+

Solution 3

Alternatively, you can use

subsetOf=udf(lambda A,B: set(A).issubset(set(B)))
df.withColumn("evInEv2", subsetOf(df.ev,df.ev2)).show()
Share:
12,916
dtj
Author by

dtj

R,python,sql,mongodb,aws

Updated on June 07, 2022

Comments

  • dtj
    dtj almost 2 years

    Suppose I have the following case

    from pyspark.sql.types import *
    schema = StructType([  # schema
        StructField("id", StringType(), True),
        StructField("ev", ArrayType(StringType()), True),
        StructField("ev2", ArrayType(StringType()), True),])
    df = spark.createDataFrame([{"id": "se1", "ev": ["ev11", "ev12"], "ev2": ["ev11"]},
                                {"id": "se2", "ev": ["ev11"], "ev2": ["ev11", "ev12"]},
                                {"id": "se3", "ev": ["ev21"], "ev2": ["ev11", "ev12"]},
                                {"id": "se4", "ev": ["ev21", "ev22"], "ev2": ["ev21", "ev22"]}],
                               schema=schema)
    

    Which gives me:

    df.show()
    +---+------------+------------+
    | id|          ev|         ev2|
    +---+------------+------------+
    |se1|[ev11, ev12]|      [ev11]|
    |se2|      [ev11]|[ev11, ev12]|
    |se3|      [ev21]|[ev11, ev12]|
    |se4|[ev21, ev22]|[ev21, ev22]|
    +---+------------+------------+
    

    I want to create a new column of boolean (or select only the true cases) for the rows where the contents of the "ev" column are inside the "ev2" column, returning:

    df_target.show()
    
    +---+------------+------------+
    | id|          ev|         ev2|
    +---+------------+------------+
    |se2|      [ev11]|[ev11, ev12]|
    |se4|[ev21, ev22]|[ev21, ev22]|
    +---+------------+------------+
    

    or:

    df_target.show()
    
    +---+------------+------------+-------+
    | id|          ev|         ev2|evInEv2|
    +---+------------+------------+-------+
    |se1|[ev11, ev12]|      [ev11]|  false|
    |se2|      [ev11]|[ev11, ev12]|   true|
    |se3|      [ev21]|[ev11, ev12]|  false|
    |se4|[ev21, ev22]|[ev21, ev22]|   true|
    +---+------------+------------+-------+
    

    I tried using the isin method:

    df.withColumn('evInEv2', df['ev'].isin(df['ev2'])).show()
    
    +---+------------+------------+-------+
    | id|          ev|         ev2|evInEv2|
    +---+------------+------------+-------+
    |se1|[ev11, ev12]|      [ev11]|  false|
    |se2|      [ev11]|[ev11, ev12]|  false|
    |se3|      [ev21]|[ev11, ev12]|  false|
    |se4|[ev21, ev22]|[ev21, ev22]|   true|
    +---+------------+------------+-------+
    

    But it looks like it only checks if it's the same array.

    I also tried the array_contains function from pyspark.sql.functions but only accepts one object and not an array to check.

    I am having difficulties even searching for this due to phrasing the correct problem.

    Thanks!