How to calculate cumulative sum using sqlContext

12,318

Solution 1

Not sure if this is what you are looking for but here are two examples how to use sqlContext to calculate the cumulative sum:

First when you want to partition it by some categories:

from pyspark.sql.types import StructType, StringType, LongType
from pyspark.sql import SQLContext

rdd = sc.parallelize([
    ("Tablet", 6500), 
    ("Tablet", 5500), 
    ("Cell Phone", 6000), 
    ("Cell Phone", 6500), 
    ("Cell Phone", 5500)
    ])

schema = StructType([
    StructField("category", StringType(), False),
    StructField("revenue", LongType(), False)
    ])

df = sqlContext.createDataFrame(rdd, schema)

df.registerTempTable("test_table")

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum
FROM
test_table
""")

Output:

[Row(category='Tablet', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=6500, cumsum=12000),
 Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Cell Phone', revenue=6000, cumsum=11500),
 Row(category='Cell Phone', revenue=6500, cumsum=18000)]

Second when you only want to take the cumsum of one variable. Change df2 to this:

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (ORDER BY revenue, category) as cumsum
FROM
test_table
""")

Output:

[Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=5500, cumsum=11000),
 Row(category='Cell Phone', revenue=6000, cumsum=17000),
 Row(category='Cell Phone', revenue=6500, cumsum=23500),
 Row(category='Tablet', revenue=6500, cumsum=30000)]

Hope this helps. Using np.cumsum is not very efficient after collecting the data especially if the dataset is large. Another way you could explore is to use simple RDD transformations like groupByKey() and then use map to calculate the cumulative sum of each group by some key and then reduce it at the end.

Solution 2

Here is a simple example:

import pyspark
from pyspark.sql import window
import pyspark.sql.functions as sf


sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)

data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20),
                                   ("Cam", "F", "Cambridge", 1, 25),
                                  ("Lin", "F", "Cambridge", 1, 25),
                                  ("Cat", "M", "Boston", 1, 20),
                                  ("Sara", "F", "Cambridge", 1, 15),
                                  ("Jeff", "M", "Cambridge", 1, 25),
                                  ("Bean", "M", "Cambridge", 1, 26),
                                  ("Dave", "M", "Cambridge", 1, 21),], 
                                 ["name", 'gender', "city", 'donation', "age"])


data.show()

gives output

+----+------+---------+--------+---+
|name|gender|     city|donation|age|
+----+------+---------+--------+---+
| Bob|     M|   Boston|       1| 20|
| Cam|     F|Cambridge|       1| 25|
| Lin|     F|Cambridge|       1| 25|
| Cat|     M|   Boston|       1| 20|
|Sara|     F|Cambridge|       1| 15|
|Jeff|     M|Cambridge|       1| 25|
|Bean|     M|Cambridge|       1| 26|
|Dave|     M|Cambridge|       1| 21|
+----+------+---------+--------+---+

Define a window

win_spec = (window.Window
                  .partitionBy(['gender', 'city'])
                  .rowsBetween(window.Window.unboundedPreceding, 0))

# window.Window.unboundedPreceding -- first row of the group # .rowsBetween(..., 0) -- 0 refers to current row, if instead -2 specified then upto 2 rows before current row

Now, here is a trap:

temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

with error :

TypeErrorTraceback (most recent call last)
<ipython-input-9-b467d24b05cd> in <module>()
----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

/Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self)
    238 
    239     def __iter__(self):
--> 240         raise TypeError("Column is not iterable")
    241 
    242     # string methods

TypeError: Column is not iterable

This is due to using python's sum function instead of pyspark's. The way to fix this is using sum function from pyspark.sql.functions.sum:

temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec))
temp.show()

will give:

+----+------+---------+--------+---+--------------+
|name|gender|     city|donation|age|CumSumDonation|
+----+------+---------+--------+---+--------------+
|Sara|     F|Cambridge|       1| 15|             1|
| Cam|     F|Cambridge|       1| 25|             2|
| Lin|     F|Cambridge|       1| 25|             3|
| Bob|     M|   Boston|       1| 20|             1|
| Cat|     M|   Boston|       1| 20|             2|
|Dave|     M|Cambridge|       1| 21|             1|
|Jeff|     M|Cambridge|       1| 25|             2|
|Bean|     M|Cambridge|       1| 26|             3|
+----+------+---------+--------+---+--------------+

Solution 3

After landing on this thread trying to solve a similar problem, I've solved my issue using this code. Not sure if I'm missing part of the OP, but this is a way to sum a SQLContext column:

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext

sc = SparkContext() 
sc.setLogLevel("ERROR")
conf = SparkConf()
conf.setAppName('Sum SQLContext Column')
conf.set("spark.executor.memory", "2g")
sqlContext = SQLContext(sc)

def sum_column(table, column):
    sc_table = sqlContext.table(table)
    return sc_table.agg({column: "sum"})

sum_column("db.tablename", "column").show()
Share:
12,318
Michael
Author by

Michael

Updated on June 25, 2022

Comments

  • Michael
    Michael almost 2 years

    I know we can use Window function in pyspark to calculate cumulative sum. But Window is only supported in HiveContext and not in SQLContext. I need to use SQLContext as HiveContext cannot be run in multi processes.

    Is there any efficient way to calculate cumulative sum using SQLContext? A simple way is to load the data into the driver's memory and use numpy.cumsum, but the con is the data need to be able to fit into the memory

  • Michael
    Michael over 8 years
    Thanks, but your solution works on hiveContext, and not sqlContext. Can you output your sqlContext? It should be showing that it is a hiveContext
  • Daniel de Paula
    Daniel de Paula over 7 years
    Only on spark 2.0+ one can use Window Functions with SQLContext. For Spark versions 1.4 ~ 1.6, it is necessary to use HiveContext
  • Abhishek Kgsk
    Abhishek Kgsk over 7 years
    No they are introduced from spark version 1.4
  • Daniel de Paula
    Daniel de Paula over 7 years
    They exist since 1.4, but before Spark 2, it was necessary to use a HiveContext. However, in many distributions, the default class for the instance of "sqlContext" in both spark-shell and pyspark is, in fact, HiveContext, so this may cause some confusions, where people would think it was possible to use window functions with the normal SQLContext. You can refer to this question for more info: stackoverflow.com/questions/36171349/…
  • Mike
    Mike about 6 years
    win_spec is not defined in your example, could you add it ? Would be most helpful to understand your great example
  • muon
    muon about 6 years
    oops my bad @Mike will try to dig up my codebase ;) fingers crossed