Spark Dataframe: Select distinct rows

34,898

Solution 1

The problem you face is explicitly stated in the exception message - because MapType columns are neither hashable nor orderable cannot be used as a part of grouping or partitioning expression.

Your take on SQL solution is not logically equivalent to distinct on Dataset. If you want to deduplicate data based on a set of compatible columns you should use dropDuplicates:

df.dropDuplicates("timestamp")

which would be equivalent to

SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
       first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp

Unfortunately if your goal is actual DISTINCT it won't be so easy. On possible solution is to leverage Scala* Map hashing. You could define Scala udf like this:

spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)

and then use it in your Java code to derive column that can be used to dropDuplicates:

 df
  .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
  .dropDuplicates(
    // All columns excluding canvasHashes / hash_of_canvas_hashes
    "timestamp",  "c1", "c2", ..., "cn" 
    // Hash used as surrogate of canvasHashes
    "hash_of_canvas_hashes"         
  )

with SQL equivalent

SELECT 
  timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
  first(canvasHashes) AS canvasHashes
FROM df GROUP BY
  timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes

* Please note that java.util.Map with its hashCode won't work, as hashCode is not consistent.

Solution 2

1) If you want to distinct based on coluns you can use it

val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("no", "age")


scala> df.show
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+

val distinctValuesDF = df.select(df("no")).distinct

scala> distinctValuesDF.show
+---+
| no|
+---+
|  1|
|  3|
+---+

2) If you have want unique on all column use dropduplicate

scala> val df = sc.parallelize(Array((1, 2), (3, 4),(3, 4), (1, 6))).toDF("no", "age")



scala> df.show

+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  3|  4|
|  1|  6|
+---+---+


scala> df.dropDuplicates().show()
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+

Solution 3

Yes, the syntax is incorrect, it should be:

Dataset<Row> landingDF = sqlContext.sql("SELECT distinct * from df");
Share:
34,898
Himanshu Yadav
Author by

Himanshu Yadav

Big data and distributed systems

Updated on July 09, 2022

Comments

  • Himanshu Yadav
    Himanshu Yadav almost 2 years

    I tried two ways to find distinct rows from parquet but it doesn't seem to work.
    Attemp 1: Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
    But throws

    Cannot have map type columns in DataFrame which calls set operations
    (intersect, except, etc.), 
    but the type of column canvasHashes is map<string,string>;;
    

    Attemp 2: Tried running sql queries:

    Dataset<Row> df = sqlContext.read().parquet("location.parquet");
        rawLandingDS.createOrReplaceTempView("df");
        Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");
    

    error I get:

    = SQL ==
    SELECT distinct on timestamp * from df
    -----------------------------^^^
    

    Is there a way to get distinct records while reading parquet files? Any read option I can use.