SPARK SQL Equivalent of Qualify + Row_number statements

25,852

There is no qualify (it is usually useful to check parser source) but you can use subquery like this:

SELECT * FROM (
    SELECT *, row_number() OVER (
        PARTITION BY acct_id ORDER BY statement_date DESC
    ) rank FROM df
 ) tmp WHERE rank <= 3

See also SPARK : failure: ``union'' expected but `(' found

Share:
25,852
Brian Correro
Author by

Brian Correro

Updated on February 09, 2022

Comments

  • Brian Correro
    Brian Correro about 2 years

    Does anyone know the best way for Apache Spark SQL to achieve the same results as the standard SQL qualify() + rnk or row_number statements?

    For example:

    • I have a Spark Dataframe called statement_data with 12 monthly records each for 100 unique account_numbers, therefore 1200 records in total
    • Each monthly record has a field called "statement_date" that can be used for determining the most recent record

    I want my final result to be a new Spark Dataframe with the 3 most recent records (as determined by statement_date descending) for each of the 100 unique account_numbers, therefore 300 final records in total.

    In standard Teradata SQL, I can do the following:

    select * from statement_data
    qualify row_number ()
    over(partition by acct_id order by statement_date desc) <= 3
    

    Apache Spark SQL does not have a standalone qualify function that I'm aware of, maybe I'm screwing up the syntax or can't find documentation that qualify exists.

    It is fine if I need to do this in two steps as long as those two steps are:

    • A select query or alternative method to assign rank/row numbering for each account_number's records
    • A select query where I'm selecting all records with rank <= 3 (i.e. choose 1st, 2nd, and 3rd most recent records).

    EDIT 1 - 7/23 2:09pm: The initial solution provided by zero323 was not working for me in Spark 1.4.1 with Spark SQL 1.4.1 dependency installed.

    EDIT 2 - 7/23 3:24pm: It turns out the error was related to using SQL Context objects for my query instead of Hive Context. I am now able to run the below solution correctly after adding the following code to create and use a Hive Context:

    final JavaSparkContext sc2;
    final HiveContext hc2;
    DataFrame df;
    hc2 = TestHive$.MODULE$;
    sc2 = new JavaSparkContext(hc2.sparkContext()); 
    ....
    // Initial Spark/SQL contexts to set up Dataframes  
    SparkConf conf = new SparkConf().setAppName("Statement Test");
    ...
    DataFrame stmtSummary = 
        hc2.sql("SELECT * FROM (SELECT acct_id, stmt_end_dt, stmt_curr_bal, row_number() over (partition by acct_id order by stmt_curr_bal DESC) rank_num FROM stmt_data) tmp WHERE rank_num <= 3");
    
  • Brian Correro
    Brian Correro almost 9 years
    This was not working for me on Spark 1.3.1. I read the "window function" (partition/order by/desc) was added in 1.4.0, working on installing that then I will give this a shot
  • Brian Correro
    Brian Correro almost 9 years
    See my edits above for additional details around getting window functions to work using Hive Context. I am accepting this answer but feel free to add the additional notes around the Hive Context vs SQL Context for clarity.
  • ashK
    ashK almost 8 years
    But I'm using cassandra nosql.