Spark and SparkSQL: How to imitate window function?

13,210

Solution 1

You can do this with RDDs. Personally I find the API for RDDs makes a lot more sense - I don't always want my data to be 'flat' like a dataframe.

val df = sqlContext.sql("select 1, '2015-09-01'"
    ).unionAll(sqlContext.sql("select 2, '2015-09-01'")
    ).unionAll(sqlContext.sql("select 1, '2015-09-03'")
    ).unionAll(sqlContext.sql("select 1, '2015-09-04'")
    ).unionAll(sqlContext.sql("select 2, '2015-09-04'"))

// dataframe as an RDD (of Row objects)
df.rdd 
  // grouping by the first column of the row
  .groupBy(r => r(0)) 
  // map each group - an Iterable[Row] - to a list and sort by the second column
  .map(g => g._2.toList.sortBy(row => row(1).toString))     
  .collect()

The above gives a result like the following:

Array[List[org.apache.spark.sql.Row]] = 
Array(
  List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]), 
  List([2,2015-09-01], [2,2015-09-04]))

If you want the position within the 'group' as well, you can use zipWithIndex.

df.rdd.groupBy(r => r(0)).map(g => 
    g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()

Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
  List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
  List(([2,2015-09-01],0), ([2,2015-09-04],1)))

You could flatten this back to a simple List/Array of Row objects using FlatMap, but if you need to perform anything on the 'group' that won't be a great idea.

The downside to using RDD like this is that it's tedious to convert from DataFrame to RDD and back again.

Solution 2

You can use HiveContext for local DataFrames as well and, unless you have a very good reason not to, it is probably a good idea anyway. It is a default SQLContext available in spark-shell and pyspark shell (as for now sparkR seems to use plain SQLContext) and its parser is recommended by Spark SQL and DataFrame Guide.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber

object HiveContextTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Hive Context")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    val df = sc.parallelize(
        ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
    ).toDF("k", "v")

    val w = Window.partitionBy($"k").orderBy($"v")
    df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
  }
}

Solution 3

I totally agree that Window functions for DataFrames are the way to go if you have Spark version (>=)1.5. But if you are really stuck with an older version(e.g 1.4.1), here is a hacky way to solve this

val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil)
           .toDF("id", "date")

val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup")
val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup")
                      .where($"date"<=$"dateDup")
                      .groupBy($"id", $"date")
                      .agg($"id", $"date", count($"idDup").as("counter"))
                      .select($"id",$"date",$"counter")

Now if you do dfWithCounter.show

You will get:

+---+----------+-------+                                                        
| id|      date|counter|
+---+----------+-------+
|  1|2015-09-01|      1|
|  1|2015-09-04|      3|
|  1|2015-09-03|      2|
|  2|2015-09-01|      1|
|  2|2015-09-04|      2|
+---+----------+-------+

Note that date is not sorted, but the counter is correct. Also you can change the ordering of the counter by changing the <= to >= in the where statement.

Share:
13,210
Martin Senne
Author by

Martin Senne

.

Updated on July 18, 2022

Comments

  • Martin Senne
    Martin Senne almost 2 years

    Description

    Given a dataframe df

    id |       date
    ---------------
     1 | 2015-09-01
     2 | 2015-09-01
     1 | 2015-09-03
     1 | 2015-09-04
     2 | 2015-09-04
    

    I want to create a running counter or index,

    • grouped by the same id and
    • sorted by date in that group,

    thus

    id |       date |  counter
    --------------------------
     1 | 2015-09-01 |        1
     1 | 2015-09-03 |        2
     1 | 2015-09-04 |        3
     2 | 2015-09-01 |        1
     2 | 2015-09-04 |        2
    

    This is something I can achieve with window function, e.g.

    val w = Window.partitionBy("id").orderBy("date")
    val resultDF = df.select( df("id"), rowNumber().over(w) )
    

    Unfortunately, Spark 1.4.1 does not support window functions for regular dataframes:

    org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
    

    Questions

    • How can I achieve the above computation on current Spark 1.4.1 without using window functions?
    • When will window functions for regular dataframes be supported in Spark?

    Thanks!

  • Martin Senne
    Martin Senne over 8 years
    Many thanks!!! That was the solution, I was looking for. Hmm, I just wasn't "brave" enough, to perform regular Scala list operations, once the groupBy was done ....
  • halil
    halil almost 8 years
    what will happen when my "g._2.toList.sortBy" list has millions of elements, i cant collect them