Compare two dataframes Pyspark

36,629

Solution 1

Assuming that we can use id to join these two datasets I don't think that there is a need for UDF. This could be solved just by using inner join, array and array_remove functions among others.

First let's create the two datasets:

df1 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "UK"],
  [3, "GHI", 3000, "JPN"],
  [4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])

df2 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "CAN"],
  [3, "GHI", 3500, "JPN"],
  [4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])

First we do an inner join between the two datasets then we generate the condition df1[col] != df2[col] for each column except id. When the columns aren't equal we return the column name otherwise an empty string. The list of conditions will consist the items of an array from which finally we remove the empty items:

from pyspark.sql.functions import col, array, when, array_remove

# get conditions for all columns except id
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c != 'id']

select_expr =[
                col("id"), 
                *[df2[c] for c in df2.columns if c != 'id'], 
                array_remove(array(*conditions_), "").alias("column_names")
]

df1.join(df2, "id").select(*select_expr).show()

# +---+-----+----+-------+------------+
# | id| name| sal|Address|column_names|
# +---+-----+----+-------+------------+
# |  1|  ABC|5000|     US|          []|
# |  3|  GHI|3500|    JPN|       [sal]|
# |  2|  DEF|4000|    CAN|   [Address]|
# |  4|JKL_M|4800|    CHN| [name, sal]|
# +---+-----+----+-------+------------+

Solution 2

Python: PySpark version of my previous scala code.

import pyspark.sql.functions as f

df1 = spark.read.option("header", "true").csv("test1.csv")
df2 = spark.read.option("header", "true").csv("test2.csv")

columns = df1.columns
df3 = df1.alias("d1").join(df2.alias("d2"), f.col("d1.id") == f.col("d2.id"), "left")

for name in columns:
    df3 = df3.withColumn(name + "_temp", f.when(f.col("d1." + name) != f.col("d2." + name), f.lit(name)))


df3.withColumn("column_names", f.concat_ws(",", *map(lambda name: f.col(name + "_temp"), columns))).select("d1.*", "column_names").show()

Scala: Here is my best approach for your problem.

val df1 = spark.read.option("header", "true").csv("test1.csv")
val df2 = spark.read.option("header", "true").csv("test2.csv")

val columns = df1.columns
val df3 = df1.alias("d1").join(df2.alias("d2"), col("d1.id") === col("d2.id"), "left")

columns.foldLeft(df3) {(df, name) => df.withColumn(name + "_temp", when(col("d1." + name) =!= col("d2." + name), lit(name)))}
  .withColumn("column_names", concat_ws(",", columns.map(name => col(name + "_temp")): _*))
  .show(false)

First, I join two dataframe into df3 and used the columns from df1. By folding left to the df3 with temp columns that have the value for column name when df1 and df2 has the same id and other column values.

After that, concat_ws for those column names and the null's are gone away and only the column names are left.

+---+----+----+-------+------------+
|id |name|sal |Address|column_names|
+---+----+----+-------+------------+
|1  |ABC |5000|US     |            |
|2  |DEF |4000|UK     |Address     |
|3  |GHI |3000|JPN    |sal         |
|4  |JKL |4500|CHN    |name,sal    |
+---+----+----+-------+------------+

The only thing different from your expected result is that the output is not a list but string.

p.s. I forgot to use PySpark but this is the normal spark, sorry.

Solution 3

Here is your solution with UDF, I have changed first dataframe name dynamically so that it will be not ambiguous during check. Go through below code and let me know in case any concerns.

>>> from pyspark.sql.functions import *
>>> df.show()
+---+----+----+-------+
| id|name| sal|Address|
+---+----+----+-------+
|  1| ABC|5000|     US|
|  2| DEF|4000|     UK|
|  3| GHI|3000|    JPN|
|  4| JKL|4500|    CHN|
+---+----+----+-------+

>>> df1.show()
+---+----+----+-------+
| id|name| sal|Address|
+---+----+----+-------+
|  1| ABC|5000|     US|
|  2| DEF|4000|    CAN|
|  3| GHI|3500|    JPN|
|  4|JKLM|4800|    CHN|
+---+----+----+-------+

>>> df2 = df.select([col(c).alias("x_"+c) for c in df.columns])
>>> df3 = df1.join(df2, col("id") == col("x_id"), "left")

 //udf declaration 

>>> def CheckMatch(Column,r):
...     check=''
...     ColList=Column.split(",")
...     for cc in ColList:
...             if(r[cc] != r["x_" + cc]):
...                     check=check + "," + cc
...     return check.replace(',','',1).split(",")

>>> CheckMatchUDF = udf(CheckMatch)

//final column that required to select
>>> finalCol = df1.columns
>>> finalCol.insert(len(finalCol), "column_names")

>>> df3.withColumn("column_names", CheckMatchUDF(lit(','.join(df1.columns)),struct([df3[x] for x in df3.columns])))
       .select(finalCol)
       .show()
+---+----+----+-------+------------+
| id|name| sal|Address|column_names|
+---+----+----+-------+------------+
|  1| ABC|5000|     US|          []|
|  2| DEF|4000|    CAN|   [Address]|
|  3| GHI|3500|    JPN|       [sal]|
|  4|JKLM|4800|    CHN| [name, sal]|
+---+----+----+-------+------------+
Share:
36,629
de-learner
Author by

de-learner

Updated on July 28, 2021

Comments

  • de-learner
    de-learner almost 3 years

    I'm trying to compare two data frames with have same number of columns i.e. 4 columns with id as key column in both data frames

    df1 = spark.read.csv("/path/to/data1.csv")
    df2 = spark.read.csv("/path/to/data2.csv")
    

    Now I want to append new column to DF2 i.e. column_names which is the list of the columns with different values than df1

    df2.withColumn("column_names",udf())
    

    DF1

    +------+---------+--------+------+
    |   id | |name  | sal  | Address |
    +------+---------+--------+------+
    |     1|  ABC   | 5000 | US      |
    |     2|  DEF   | 4000 | UK      |
    |     3|  GHI   | 3000 | JPN     |
    |     4|  JKL   | 4500 | CHN     |
    +------+---------+--------+------+
    

    DF2:

    +------+---------+--------+------+
    |   id | |name  | sal  | Address |
    +------+---------+--------+------+
    |     1|  ABC   | 5000 | US      |
    |     2|  DEF   | 4000 | CAN     |
    |     3|  GHI   | 3500 | JPN     |
    |     4|  JKL_M | 4800 | CHN     |
    +------+---------+--------+------+
    

    Now I want DF3

    DF3:

    +------+---------+--------+------+--------------+
    |   id | |name  | sal  | Address | column_names |
    +------+---------+--------+------+--------------+
    |     1|  ABC   | 5000 | US      |  []          |
    |     2|  DEF   | 4000 | CAN     |  [address]   |
    |     3|  GHI   | 3500 | JPN     |  [sal]       |
    |     4|  JKL_M | 4800 | CHN     |  [name,sal]  |
    +------+---------+--------+------+--------------+
    

    I saw this SO question, How to compare two dataframe and print columns that are different in scala. Tried that, however the result is different.

    I'm thinking of going with a UDF function by passing row from each dataframe to udf and compare column by column and return column list. However for that both the data frames should be in sorted order so that same id rows will be sent to udf. Sorting is costly operation here. Any solution?