Conditional aggregation Spark DataFrame
You can write your conditional aggregation using when/otherwise
as
df2.groupBy("name").agg(sum(when(lit(filterType) === "MIN" && $"logDate" < filterDate, $"acc").otherwise(when(lit(filterType) === "MAX" && $"logDate" > filterDate, $"acc"))).as("sum"))
.filter($"sum".isNotNull)
which would give you your desired output according to filterType
But
Eventually you would require both aggregated dataframes so I would suggest you to avoid filterType
field and just go with aggregation by creating additional column for grouping using when/otherwise
function. So that you can have both aggregated values in one dataframe as
df2.withColumn("additionalGrouping", when($"logDate" < filterDate, "less").otherwise("more"))
.groupBy("name", "additionalGrouping").agg(sum($"acc"))
.drop("additionalGrouping")
.show(false)
which would output as
+-------+--------+
|name |sum(acc)|
+-------+--------+
|Diego |10 |
|Giorgio|60 |
+-------+--------+
Updated
Since the question is updated with the logic changed, here is the idea and solution to the changed scenario
import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("name").orderBy($"logDate".asc)
val minDF = df2.withColumn("minLogDate", first("logDate").over(windowSpec)).filter($"minLogDate" === $"logDate")
.groupBy("name")
.agg(sum($"acc").as("sum"))
val finalDF =
if(filterType == "MIN") {
minDF
}
else if(filterType == "MAX"){
val tempMaxDF = df2
.groupBy("name")
.agg(sum(when($"logDate" > filterDate,$"acc")).as("sum"))
tempMaxDF.filter($"sum".isNull).drop("sum").join(minDF, Seq("name"), "left").union(tempMaxDF.filter($"sum".isNotNull))
}
else {
df2
}
so for filterType = MIN
you should have
+-------+---+
|name |sum|
+-------+---+
|Diego |30 |
|Giorgio|30 |
+-------+---+
and for filterType = MAX
you should have
+-------+---+
|name |sum|
+-------+---+
|Diego |30 |
|Giorgio|60 |
+-------+---+
In case if the filterType
isn't MAX
or MIN
then original dataframe is returned
I hope the answer is helpful
Related videos on Youtube
Giorgio
Updated on June 04, 2022Comments
-
Giorgio almost 2 years
I would like to understand the best way to do an aggregation in Spark in this scenario:
import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Person(name:String, acc:Int, logDate:String) val dateFormat = "dd/MM/yyyy" val filterType = // Could has "MIN" or "MAX" depending on a run parameter val filterDate = new Timestamp(System.currentTimeMillis) val df = sc.parallelize(List(Person("Giorgio",20,"31/12/9999"), Person("Giorgio",30,"12/10/2009") Person("Diego", 10,"12/10/2010"), Person("Diego", 20,"12/10/2010"), Person("Diego", 30,"22/11/2011"), Person("Giorgio",10,"31/12/9999"), Person("Giorgio",30,"31/12/9999"))).toDF() val df2 = df.withColumn("logDate",unix_timestamp($"logDate",dateFormat).cast(TimestampType)) val df3 = df.groupBy("name").agg(/*conditional aggregation*/) df3.show /*Expected output show below */
Basically I want to group all records by
name
column and then based on thefilterType
parameter, I want to filter all valid records for a Person, then after filtering, I want to sum allacc
values obtaining a finalDataFrame
with name and totalAcc columns.For example:
- filterType = MIN , I want to take all records with having min(logDate) , could be many of them, so basically in this case I completely ignore filterDate param:
Diego,10,12/10/2010 Diego,20,12/10/2010 Giorgio,30,12/10/2009
Final result expected from aggregation is: (Diego, 30),(Giorgio,30)
- filterType = MAX , I want to take all records with logDate > filterDate, I for a key I don't have any records respecting this condition, I need to take records with min(logDate) as done in MIN scenario, so:
Diego, 10, 12/10/2010 Diego, 20, 12/10/2010 Giorgio, 20, 31/12/9999 Giorgio, 10, 31/12/9999 Giorgio, 30, 31/12/9999
Final result expected from aggregation is: (Diego,30),(Giorgio,60) In this case for Diego I didn't have any records with logDate > logFilter, so I fallback to apply MIN scenario, taking just for Diego all records with min logDate.
-
Ramesh Maharjan over 5 yearswhat if you have two records of Diego both smaller than filterDate? can you clarify what would be the output
-
Giorgio over 5 yearsIf for Diego I have just records smaller than logDate, for flowType MIN I Will take the records with min logDate, if both of them has min logDate, I'll sum them. For MAX scenario if for Diego I don't have any records with logDate >logfilter, then I will look for records with min logDate exactly as done for MIN scenario
-
Giorgio over 5 yearsI updated again also for the scenario you mentioned before.
-
Giorgio over 5 yearsThe record of 2011 for Diego must be excluded from both scenario because for MIN It doesn't min logDate and for MAX scenario It fails logDate > logFilter
-
Ramesh Maharjan over 5 yearsI have updated my answer below
-
Giorgio over 5 yearsWhy did you choose implementing the solution using Windowing function? It give you some kind of performance boost or what else ?
-
Ramesh Maharjan over 5 yearsThats mainly because there was a need for computation within a group and output all the rows and window function is best for that. Do you have anything else in mind? @Giorgio
-
Giorgio over 5 yearsI'm quite newbie with DataFrame, maybe you can help me to clarify the following point: I'm thinking about in terms of performance, because actually in production I will face with a DataFrame of 30-35 milions of records and I would asking myself if the window and the join in case of MAX filterType could a bottleneck.
-
Ramesh Maharjan over 5 yearswell your requirement suggests that you need to identify the minimum within a group thats why a window function and your second max requirement suggests that you need both minimum and max so a join. You can alway search for best alternatives . But the solution I presented is the best solution I know of. If you find better option than this please do let me know by answering . and if not you are always welcome to upvote and accept the answer. thanks
-
Giorgio over 5 yearsI wasn't be something like polemic. I just want to share with you my doubts because you seems very expert. I just asking myself if for you in terms of performance could be best do all computations in one dataFrame (if possible, maybe using the your first solution and adapt it in the new scenario) or do more DataFrame joining them. Thanks
-
Ramesh Maharjan over 5 yearsthe MIN requirement is straight forward and is complete in one dataframe but your MAX requirement has
I didn't have any records with logDate > logFilter, so I fallback to apply MIN scenario, taking just for Diego all records with min logDate.
which needs to check through whole dataframe for checking if a record doesn't meet MAX condition, if it doesn;t then you need to apply MIN condition on the records which doesn't meet MAX condition. so since MIN logic is already impemented and calculated why not use it for MAX condition using join. thats the logic I used -
Giorgio over 5 yearslast question: for the sake of performance, in order to accomplish my scenario, it will be better to pass to RDD or use DataFrame with Window function as you explained me? Regards
-
Ramesh Maharjan over 5 yearsDataframes are optimized forms of rdd. optimized for both speed and memory usage. but if you have faster options in rdd you can always use that. Test and confirm. Thanks
-
jack over 3 years@RameshMaharjan Hi thanks for the answer. Just one question, if i do filter first then aggregation like this answer, what do you think of the performance difference? What's the benefit of using conditional aggregation inside
.agg(sum(when(...
? Thanks