Conditional aggregation Spark DataFrame

13,113

You can write your conditional aggregation using when/otherwise as

df2.groupBy("name").agg(sum(when(lit(filterType) === "MIN" && $"logDate" < filterDate, $"acc").otherwise(when(lit(filterType) === "MAX" && $"logDate" > filterDate, $"acc"))).as("sum"))
    .filter($"sum".isNotNull)

which would give you your desired output according to filterType

But

Eventually you would require both aggregated dataframes so I would suggest you to avoid filterType field and just go with aggregation by creating additional column for grouping using when/otherwise function. So that you can have both aggregated values in one dataframe as

df2.withColumn("additionalGrouping", when($"logDate" < filterDate, "less").otherwise("more"))
    .groupBy("name", "additionalGrouping").agg(sum($"acc"))
    .drop("additionalGrouping")
    .show(false)

which would output as

+-------+--------+
|name   |sum(acc)|
+-------+--------+
|Diego  |10      |
|Giorgio|60      |
+-------+--------+

Updated

Since the question is updated with the logic changed, here is the idea and solution to the changed scenario

import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("name").orderBy($"logDate".asc)

val minDF = df2.withColumn("minLogDate", first("logDate").over(windowSpec)).filter($"minLogDate" === $"logDate")
  .groupBy("name")
  .agg(sum($"acc").as("sum"))

val finalDF =
  if(filterType == "MIN") {
    minDF
  }
  else if(filterType == "MAX"){
    val tempMaxDF = df2
      .groupBy("name")
      .agg(sum(when($"logDate" > filterDate,$"acc")).as("sum"))

    tempMaxDF.filter($"sum".isNull).drop("sum").join(minDF, Seq("name"), "left").union(tempMaxDF.filter($"sum".isNotNull))
  }
  else {
    df2
  }

so for filterType = MIN you should have

+-------+---+
|name   |sum|
+-------+---+
|Diego  |30 |
|Giorgio|30 |
+-------+---+

and for filterType = MAX you should have

+-------+---+
|name   |sum|
+-------+---+
|Diego  |30 |
|Giorgio|60 |
+-------+---+

In case if the filterType isn't MAX or MIN then original dataframe is returned

I hope the answer is helpful

Share:
13,113

Related videos on Youtube

Giorgio
Author by

Giorgio

Updated on June 04, 2022

Comments

  • Giorgio
    Giorgio almost 2 years

    I would like to understand the best way to do an aggregation in Spark in this scenario:

    import sqlContext.implicits._  
    import org.apache.spark.sql.functions._
    case class Person(name:String, acc:Int, logDate:String)
    val dateFormat = "dd/MM/yyyy"
    val filterType = // Could has "MIN" or "MAX" depending on a run parameter
    val filterDate = new Timestamp(System.currentTimeMillis)
    
    val df = sc.parallelize(List(Person("Giorgio",20,"31/12/9999"),
                                 Person("Giorgio",30,"12/10/2009")
                                 Person("Diego",  10,"12/10/2010"),
                                 Person("Diego",  20,"12/10/2010"),
                                 Person("Diego",  30,"22/11/2011"), 
                                 Person("Giorgio",10,"31/12/9999"),
                                 Person("Giorgio",30,"31/12/9999"))).toDF()
    
    val df2 = df.withColumn("logDate",unix_timestamp($"logDate",dateFormat).cast(TimestampType))
    
    val df3 = df.groupBy("name").agg(/*conditional aggregation*/)
    df3.show /*Expected output show  below */
    

    Basically I want to group all records by name column and then based on the filterType parameter, I want to filter all valid records for a Person, then after filtering, I want to sum all acc values obtaining a final DataFrame with name and totalAcc columns.

    For example:

    • filterType = MIN , I want to take all records with having min(logDate) , could be many of them, so basically in this case I completely ignore filterDate param:

    Diego,10,12/10/2010 Diego,20,12/10/2010 Giorgio,30,12/10/2009

    Final result expected from aggregation is: (Diego, 30),(Giorgio,30)

    • filterType = MAX , I want to take all records with logDate > filterDate, I for a key I don't have any records respecting this condition, I need to take records with min(logDate) as done in MIN scenario, so:

    Diego, 10, 12/10/2010 Diego, 20, 12/10/2010 Giorgio, 20, 31/12/9999 Giorgio, 10, 31/12/9999 Giorgio, 30, 31/12/9999

    Final result expected from aggregation is: (Diego,30),(Giorgio,60) In this case for Diego I didn't have any records with logDate > logFilter, so I fallback to apply MIN scenario, taking just for Diego all records with min logDate.

    • Ramesh Maharjan
      Ramesh Maharjan over 5 years
      what if you have two records of Diego both smaller than filterDate? can you clarify what would be the output
    • Giorgio
      Giorgio over 5 years
      If for Diego I have just records smaller than logDate, for flowType MIN I Will take the records with min logDate, if both of them has min logDate, I'll sum them. For MAX scenario if for Diego I don't have any records with logDate >logfilter, then I will look for records with min logDate exactly as done for MIN scenario
    • Giorgio
      Giorgio over 5 years
      I updated again also for the scenario you mentioned before.
    • Giorgio
      Giorgio over 5 years
      The record of 2011 for Diego must be excluded from both scenario because for MIN It doesn't min logDate and for MAX scenario It fails logDate > logFilter
    • Ramesh Maharjan
      Ramesh Maharjan over 5 years
      I have updated my answer below
  • Giorgio
    Giorgio over 5 years
    Why did you choose implementing the solution using Windowing function? It give you some kind of performance boost or what else ?
  • Ramesh Maharjan
    Ramesh Maharjan over 5 years
    Thats mainly because there was a need for computation within a group and output all the rows and window function is best for that. Do you have anything else in mind? @Giorgio
  • Giorgio
    Giorgio over 5 years
    I'm quite newbie with DataFrame, maybe you can help me to clarify the following point: I'm thinking about in terms of performance, because actually in production I will face with a DataFrame of 30-35 milions of records and I would asking myself if the window and the join in case of MAX filterType could a bottleneck.
  • Ramesh Maharjan
    Ramesh Maharjan over 5 years
    well your requirement suggests that you need to identify the minimum within a group thats why a window function and your second max requirement suggests that you need both minimum and max so a join. You can alway search for best alternatives . But the solution I presented is the best solution I know of. If you find better option than this please do let me know by answering . and if not you are always welcome to upvote and accept the answer. thanks
  • Giorgio
    Giorgio over 5 years
    I wasn't be something like polemic. I just want to share with you my doubts because you seems very expert. I just asking myself if for you in terms of performance could be best do all computations in one dataFrame (if possible, maybe using the your first solution and adapt it in the new scenario) or do more DataFrame joining them. Thanks
  • Ramesh Maharjan
    Ramesh Maharjan over 5 years
    the MIN requirement is straight forward and is complete in one dataframe but your MAX requirement has I didn't have any records with logDate > logFilter, so I fallback to apply MIN scenario, taking just for Diego all records with min logDate. which needs to check through whole dataframe for checking if a record doesn't meet MAX condition, if it doesn;t then you need to apply MIN condition on the records which doesn't meet MAX condition. so since MIN logic is already impemented and calculated why not use it for MAX condition using join. thats the logic I used
  • Giorgio
    Giorgio over 5 years
    last question: for the sake of performance, in order to accomplish my scenario, it will be better to pass to RDD or use DataFrame with Window function as you explained me? Regards
  • Ramesh Maharjan
    Ramesh Maharjan over 5 years
    Dataframes are optimized forms of rdd. optimized for both speed and memory usage. but if you have faster options in rdd you can always use that. Test and confirm. Thanks
  • jack
    jack over 3 years
    @RameshMaharjan Hi thanks for the answer. Just one question, if i do filter first then aggregation like this answer, what do you think of the performance difference? What's the benefit of using conditional aggregation inside .agg(sum(when(...? Thanks