Difference between two rows in Spark dataframe

15,206

Solution 1

You can use Window function if the calculation is fixed as calculating difference between previous months, or calculating between previous two months ... etc. For that you can use lag and lead function with Window.

But for that you need to change the date column as below so that it can be ordered.

+-------+------+--------------+------+
|Column1|Date  |Date_Converted|Amount|
+-------+------+--------------+------+
|A      |1-jul |2017-07-01    |1000  |
|A      |1-june|2017-06-01    |2000  |
|A      |1-May |2017-05-01    |2000  |
|A      |1-dec |2017-12-01    |3000  |
|A      |1-Nov |2017-11-01    |2000  |
|B      |1-jul |2017-07-01    |100   |
|B      |1-june|2017-06-01    |300   |
|B      |1-May |2017-05-01    |400   |
|B      |1-dec |2017-12-01    |300   |
+-------+------+--------------+------+

You can find the difference between previous month and current month by doing

import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("Column1").orderBy("Date_Converted")
import org.apache.spark.sql.functions._
df.withColumn("diff_Amt_With_Prev_Month", $"Amount" - when((lag("Amount", 1).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 1).over(windowSpec)))
   .show(false)

You should have

+-------+------+--------------+------+------------------------+
|Column1|Date  |Date_Converted|Amount|diff_Amt_With_Prev_Month|
+-------+------+--------------+------+------------------------+
|B      |1-May |2017-05-01    |400   |400.0                   |
|B      |1-june|2017-06-01    |300   |-100.0                  |
|B      |1-jul |2017-07-01    |100   |-200.0                  |
|B      |1-dec |2017-12-01    |300   |200.0                   |
|A      |1-May |2017-05-01    |2000  |2000.0                  |
|A      |1-june|2017-06-01    |2000  |0.0                     |
|A      |1-jul |2017-07-01    |1000  |-1000.0                 |
|A      |1-Nov |2017-11-01    |2000  |1000.0                  |
|A      |1-dec |2017-12-01    |3000  |1000.0                  |
+-------+------+--------------+------+------------------------+

You can increase the lagging position for previous two months as

df.withColumn("diff_Amt_With_Prev_two_Month", $"Amount" - when((lag("Amount", 2).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 2).over(windowSpec)))
  .show(false)

which will give you

+-------+------+--------------+------+----------------------------+
|Column1|Date  |Date_Converted|Amount|diff_Amt_With_Prev_two_Month|
+-------+------+--------------+------+----------------------------+
|B      |1-May |2017-05-01    |400   |400.0                       |
|B      |1-june|2017-06-01    |300   |300.0                       |
|B      |1-jul |2017-07-01    |100   |-300.0                      |
|B      |1-dec |2017-12-01    |300   |0.0                         |
|A      |1-May |2017-05-01    |2000  |2000.0                      |
|A      |1-june|2017-06-01    |2000  |2000.0                      |
|A      |1-jul |2017-07-01    |1000  |-1000.0                     |
|A      |1-Nov |2017-11-01    |2000  |0.0                         |
|A      |1-dec |2017-12-01    |3000  |2000.0                      |
+-------+------+--------------+------+----------------------------+

I hope the answer is helpful

Solution 2

Assumming those two dates belong to each group of your table

my imports :

import org.apache.spark.sql.functions.{concat_ws,collect_list,lit}

Perpare the dataframe

scala> val seqRow = Seq(
 | ("A","1- jul",1000),
 | ("A","1-june",2000),
 | ("A","1-May",2000),
 | ("A","1-dec",3000),
 | ("B","1-jul",100),
 | ("B","1-june",300),
 | ("B","1-May",400),
 | ("B","1-dec",300))

seqRow: Seq[(String, String, Int)] = List((A,1- jul,1000), (A,1-june,2000), (A,1-May,2000), (A,1-dec,3000), (B,1-jul,100), (B,1-june,300), (B,1-May,400), (B,1-dec,300))

scala> val input_df = sc.parallelize(seqRow).toDF("column1","date","amount")
input_df: org.apache.spark.sql.DataFrame = [column1: string, date: string ... 1 more field]

Now write a UDF for your case,

scala> def calc_diff = udf((list : Seq[String],startMonth : String,endMonth : String) => {
     |     //get the month and their values
     |     val monthMap = list.map{str =>
     |     val splitText = str.split("\\$")
     |     val month = splitText(0).split("-")(1).trim
     |
     |         (month.toLowerCase,splitText(1).toInt)
     |     }.toMap
     |
     |     val stMnth = monthMap(startMonth)
     |     val endMnth = monthMap(endMonth)
     |     endMnth - stMnth
     |
     | })
calc_diff: org.apache.spark.sql.expressions.UserDefinedFunction

Now, Preparing the output

scala> val (month1 : String,month2 : String) = ("jul","dec")
month1: String = jul
month2: String = dec

scala> val req_df = group_df.withColumn("diff",calc_diff('collect_val,lit(month1.toLowerCase),lit(month2.toLowerCase)))
req_df: org.apache.spark.sql.DataFrame = [column1: string, sum_amount: bigint ... 2 more fields]

scala> val req_df = group_df.withColumn("diff",calc_diff('collect_val,lit(month1.toLowerCase),lit(month2.toLowerCase))).drop('collect_val)
req_df: org.apache.spark.sql.DataFrame = [column1: string, sum_amount: bigint ... 1 more field]

scala> req_df.orderBy('column1).show
+-------+----------+----+
|column1|sum_amount|diff|
+-------+----------+----+
|      A|      8000|2000|
|      B|      1100| 200|
+-------+----------+----+

Hope, this is what you want.

Share:
15,206
chinkrishna
Author by

chinkrishna

Updated on June 18, 2022

Comments

  • chinkrishna
    chinkrishna almost 2 years

    I created a dataframe in Spark, by groupby column1 and date and calculated the amount.

    val table = df1.groupBy($"column1",$"date").sum("amount")
    
    Column1 |Date   |Amount
    A   |1-jul  |1000
    A   |1-june |2000
    A   |1-May  |2000
    A   |1-dec  |3000
    A   |1-Nov  |2000
    B   |1-jul  |100
    B   |1-june |300    
    B   |1-May  |400
    B   |1-dec  |300
    

    Now, I want to add new column, with difference between amount of any two dates from the table.