Remove all records which are duplicate in spark dataframe

12,910

Solution 1

I would use window-functions for this. Lets say you want to remove duplicate id rows :

import org.apache.spark.sql.expressions.Window

df
  .withColumn("cnt", count("*").over(Window.partitionBy($"id")))
  .where($"cnt"===1).drop($"cnt")
  .show()

Solution 2

This can be done by grouping by the column (or columns) to look for duplicates in and then aggregate and filter the results.

Example dataframe df:

+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  4|  5|
+---+---+

Grouping by the id column to remove its duplicates (the last two rows):

val df2 = df.groupBy("id")
  .agg(first($"num").as("num"), count($"id").as("count"))
  .filter($"count" === 1)
  .select("id", "num")

This will give you:

+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
+---+---+

Alternativly, it can be done by using a join. It will be slower, but if there is a lot of columns there is no need to use first($"num").as("num") for each one to keep them.

val df2 = df.groupBy("id").agg(count($"id").as("count")).filter($"count" === 1).select("id")
val df3 = df.join(df2, Seq("id"), "inner")

Solution 3

I added a killDuplicates() method to the open source spark-daria library that uses @Raphael Roth's solution. Here's how to use the code:

import com.github.mrpowers.spark.daria.sql.DataFrameExt._

df.killDuplicates(col("id"))

// you can also supply multiple Column arguments
df.killDuplicates(col("id"), col("another_column"))

Here's the code implementation:

object DataFrameExt {

  implicit class DataFrameMethods(df: DataFrame) {

    def killDuplicates(cols: Column*): DataFrame = {
      df
        .withColumn(
          "my_super_secret_count",
          count("*").over(Window.partitionBy(cols: _*))
        )
        .where(col("my_super_secret_count") === 1)
        .drop(col("my_super_secret_count"))
    }

  }

}

You might want to leverage the spark-daria library to keep this logic out of your codebase.

Share:
12,910
salmanbw
Author by

salmanbw

Big Data enthusiast

Updated on June 16, 2022

Comments

  • salmanbw
    salmanbw almost 2 years

    I have a spark dataframe with multiple columns in it. I want to find out and remove rows which have duplicated values in a column (the other columns can be different).

    I tried using dropDuplicates(col_name) but it will only drop duplicate entries but still keep one record in the dataframe. What I need is to remove all entries which were initially containing duplicate entries.

    I am using Spark 1.6 and Scala 2.10.

  • philantrovert
    philantrovert about 6 years
    Since you are checking the count of grouped column == 1, can't the join be replaced by first($"num").as("num") ? It should result in the same output.
  • Shaido
    Shaido about 6 years
    @philantrovert That is a good point, not sure how I missed that. I changed the answer to take that into account.
  • salmanbw
    salmanbw about 6 years
    @shaido Your solution also worked fine, but i would like to avoid creating another dataframe for this problem, that's why accepting raphael's ans
  • Shaido
    Shaido about 6 years
    @salmanbw The first approach will not create any extra dataframe though? The only difference is that this one uses groupBy and agg while his uses window functions to get the result.
  • thebluephantom
    thebluephantom over 5 years
    I tried this with random generation of data and for that users should note that .persist could be relevant.