How to calculate cumulative sum using sqlContext
Solution 1
Not sure if this is what you are looking for but here are two examples how to use sqlContext to calculate the cumulative sum:
First when you want to partition it by some categories:
from pyspark.sql.types import StructType, StringType, LongType
from pyspark.sql import SQLContext
rdd = sc.parallelize([
("Tablet", 6500),
("Tablet", 5500),
("Cell Phone", 6000),
("Cell Phone", 6500),
("Cell Phone", 5500)
])
schema = StructType([
StructField("category", StringType(), False),
StructField("revenue", LongType(), False)
])
df = sqlContext.createDataFrame(rdd, schema)
df.registerTempTable("test_table")
df2 = sqlContext.sql("""
SELECT
category,
revenue,
sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum
FROM
test_table
""")
Output:
[Row(category='Tablet', revenue=5500, cumsum=5500),
Row(category='Tablet', revenue=6500, cumsum=12000),
Row(category='Cell Phone', revenue=5500, cumsum=5500),
Row(category='Cell Phone', revenue=6000, cumsum=11500),
Row(category='Cell Phone', revenue=6500, cumsum=18000)]
Second when you only want to take the cumsum of one variable. Change df2 to this:
df2 = sqlContext.sql("""
SELECT
category,
revenue,
sum(revenue) OVER (ORDER BY revenue, category) as cumsum
FROM
test_table
""")
Output:
[Row(category='Cell Phone', revenue=5500, cumsum=5500),
Row(category='Tablet', revenue=5500, cumsum=11000),
Row(category='Cell Phone', revenue=6000, cumsum=17000),
Row(category='Cell Phone', revenue=6500, cumsum=23500),
Row(category='Tablet', revenue=6500, cumsum=30000)]
Hope this helps. Using np.cumsum is not very efficient after collecting the data especially if the dataset is large. Another way you could explore is to use simple RDD transformations like groupByKey() and then use map to calculate the cumulative sum of each group by some key and then reduce it at the end.
Solution 2
Here is a simple example:
import pyspark
from pyspark.sql import window
import pyspark.sql.functions as sf
sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)
data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20),
("Cam", "F", "Cambridge", 1, 25),
("Lin", "F", "Cambridge", 1, 25),
("Cat", "M", "Boston", 1, 20),
("Sara", "F", "Cambridge", 1, 15),
("Jeff", "M", "Cambridge", 1, 25),
("Bean", "M", "Cambridge", 1, 26),
("Dave", "M", "Cambridge", 1, 21),],
["name", 'gender', "city", 'donation', "age"])
data.show()
gives output
+----+------+---------+--------+---+
|name|gender| city|donation|age|
+----+------+---------+--------+---+
| Bob| M| Boston| 1| 20|
| Cam| F|Cambridge| 1| 25|
| Lin| F|Cambridge| 1| 25|
| Cat| M| Boston| 1| 20|
|Sara| F|Cambridge| 1| 15|
|Jeff| M|Cambridge| 1| 25|
|Bean| M|Cambridge| 1| 26|
|Dave| M|Cambridge| 1| 21|
+----+------+---------+--------+---+
Define a window
win_spec = (window.Window
.partitionBy(['gender', 'city'])
.rowsBetween(window.Window.unboundedPreceding, 0))
# window.Window.unboundedPreceding -- first row of the group
# .rowsBetween(..., 0) -- 0
refers to current row, if instead -2
specified then upto 2 rows before current row
Now, here is a trap:
temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))
with error :
TypeErrorTraceback (most recent call last)
<ipython-input-9-b467d24b05cd> in <module>()
----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))
/Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self)
238
239 def __iter__(self):
--> 240 raise TypeError("Column is not iterable")
241
242 # string methods
TypeError: Column is not iterable
This is due to using python's sum
function instead of pyspark's
. The way to fix this is using sum
function from pyspark.sql.functions.sum
:
temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec))
temp.show()
will give:
+----+------+---------+--------+---+--------------+
|name|gender| city|donation|age|CumSumDonation|
+----+------+---------+--------+---+--------------+
|Sara| F|Cambridge| 1| 15| 1|
| Cam| F|Cambridge| 1| 25| 2|
| Lin| F|Cambridge| 1| 25| 3|
| Bob| M| Boston| 1| 20| 1|
| Cat| M| Boston| 1| 20| 2|
|Dave| M|Cambridge| 1| 21| 1|
|Jeff| M|Cambridge| 1| 25| 2|
|Bean| M|Cambridge| 1| 26| 3|
+----+------+---------+--------+---+--------------+
Solution 3
After landing on this thread trying to solve a similar problem, I've solved my issue using this code. Not sure if I'm missing part of the OP, but this is a way to sum a SQLContext
column:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
sc = SparkContext()
sc.setLogLevel("ERROR")
conf = SparkConf()
conf.setAppName('Sum SQLContext Column')
conf.set("spark.executor.memory", "2g")
sqlContext = SQLContext(sc)
def sum_column(table, column):
sc_table = sqlContext.table(table)
return sc_table.agg({column: "sum"})
sum_column("db.tablename", "column").show()
Michael
Updated on June 25, 2022Comments
-
Michael almost 2 years
I know we can use Window function in pyspark to calculate cumulative sum. But Window is only supported in HiveContext and not in SQLContext. I need to use SQLContext as HiveContext cannot be run in multi processes.
Is there any efficient way to calculate cumulative sum using SQLContext? A simple way is to load the data into the driver's memory and use numpy.cumsum, but the con is the data need to be able to fit into the memory
-
Michael over 8 yearsThanks, but your solution works on hiveContext, and not sqlContext. Can you output your sqlContext? It should be showing that it is a hiveContext
-
Daniel de Paula over 7 yearsOnly on spark 2.0+ one can use Window Functions with SQLContext. For Spark versions 1.4 ~ 1.6, it is necessary to use HiveContext
-
Abhishek Kgsk over 7 yearsNo they are introduced from spark version 1.4
-
Daniel de Paula over 7 yearsThey exist since 1.4, but before Spark 2, it was necessary to use a HiveContext. However, in many distributions, the default class for the instance of "sqlContext" in both spark-shell and pyspark is, in fact, HiveContext, so this may cause some confusions, where people would think it was possible to use window functions with the normal SQLContext. You can refer to this question for more info: stackoverflow.com/questions/36171349/…
-
Mike about 6 yearswin_spec is not defined in your example, could you add it ? Would be most helpful to understand your great example
-
muon about 6 yearsoops my bad @Mike will try to dig up my codebase ;) fingers crossed