Add column sum as new column in PySpark dataframe

95,013

Solution 1

This was not obvious. I see no row-based sum of the columns defined in the spark Dataframes API.

Version 2

This can be done in a fairly simple way:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

df.columns is supplied by pyspark as a list of strings giving all of the column names in the Spark Dataframe. For a different sum, you can supply any other list of column names instead.

I did not try this as my first solution because I wasn't certain how it would behave. But it works.

Version 1

This is overly complicated, but works as well.

You can do this:

  1. use df.columns to get a list of the names of the columns
  2. use that names list to make a list of the columns
  3. pass that list to something that will invoke the column's overloaded add function in a fold-type functional manner

With python's reduce, some knowledge of how operator overloading works, and the pyspark code for columns here that becomes:

def column_add(a,b):
     return  a.__add__(b)

newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))

Note this is a python reduce, not a spark RDD reduce, and the parenthesis term in the second parameter to reduce requires the parenthesis because it is a list generator expression.

Tested, Works!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]

Solution 2

The most straight forward way of doing it is to use the expr function

from pyspark.sql.functions import *
data = data.withColumn('total', expr("col1 + col2 + col3 + col4"))

Solution 3

The solution

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

posted by @Paul works. Nevertheless I was getting the error, as many other as I have seen,

TypeError: 'Column' object is not callable

After some time I found the problem (at least in my case). The problem is that I previously imported some pyspark functions with the line

from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min

so the line imported the sum pyspark command while df.withColumn('total', sum(df[col] for col in df.columns)) is supposed to use the normal python sum function.

You can delete the reference of the pyspark function with del sum.

Otherwise in my case I changed the import to

import pyspark.sql.functions as F

and then referenced the functions as F.sum.

Solution 4

Summing multiple columns from a list into one column

PySpark's sum function doesn't support column addition. This can be achieved using expr function.

from pyspark.sql.functions import expr

cols_list = ['a', 'b', 'c']

# Creating an addition expression using `join`
expression = '+'.join(cols_list)

df = df.withColumn('sum_cols', expr(expression))

This gives us the desired sum of columns.

Solution 5

My problem was similar to the above (bit more complex) as i had to add consecutive column sums as new columns in PySpark dataframe. This approach uses code from Paul's Version 1 above:

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
                              ,(6,1,-4),(0,2,-2),(6,4,1)\
                              ,(4,5,2),(5,-3,-5),(6,4,-1)]\
                              ,schema=['x1','x2','x3'])
df.show()

+---+---+---+
| x1| x2| x3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  3|  2|  1|
|  6|  1| -4|
|  0|  2| -2|
|  6|  4|  1|
|  4|  5|  2|
|  5| -3| -5|
|  6|  4| -1|
+---+---+---+

colnames=df.columns

add new columns that are cumulative sums (consecutive):

for i in range(0,len(colnames)):
    colnameLst= colnames[0:i+1]
    colname = 'cm'+ str(i+1)
    df = df.withColumn(colname, sum(df[col] for col in colnameLst))

df.show()

+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
|  1|  2|  3|  1|  3|  6|
|  4|  5|  6|  4|  9| 15|
|  3|  2|  1|  3|  5|  6|
|  6|  1| -4|  6|  7|  3|
|  0|  2| -2|  0|  2|  0|
|  6|  4|  1|  6| 10| 11|
|  4|  5|  2|  4|  9| 11|
|  5| -3| -5|  5|  2| -3|
|  6|  4| -1|  6| 10|  9|
+---+---+---+---+---+---+

'cumulative sum' columns added are as follows:

cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3
Share:
95,013
plam
Author by

plam

Updated on July 09, 2022

Comments

  • plam
    plam almost 2 years

    I'm using PySpark and I have a Spark dataframe with a bunch of numeric columns. I want to add a column that is the sum of all the other columns.

    Suppose my dataframe had columns "a", "b", and "c". I know I can do this:

    df.withColumn('total_col', df.a + df.b + df.c)
    

    The problem is that I don't want to type out each column individually and add them, especially if I have a lot of columns. I want to be able to do this automatically or by specifying a list of column names that I want to add. Is there another way to do this?

    • Paul
      Paul over 8 years
      This is much easier with RDDs than dataframes e.g. if data is an array representing a row, then you can do RDD.map(lambda data: (data, sum(data))). The main reason this is more difficult with a spark dataframe is figuring out what is allowed as a column expression in withColumn. It doesn't seem to be very well documented.
    • Abhinav Sood
      Abhinav Sood over 6 years
      This doesn't seem to work either (PySpark 1.6.3): dftest.withColumn("times", sum((dftest[c] > 2).cast("int") for c in dftest.columns[1:])) and then, dftest.select('a', 'b', 'c', 'd').rdd.map(lambda x: (x, sum(x))).take(2) Does not seem to work
  • Paul
    Paul over 8 years
    @Salmonerd Thanks. It helps sometimes to remember the spark dataframe class is immutable, and so to make any changes in the data you have to call something that returns a new dataframe.
  • Hemant
    Hemant almost 8 years
    Version 2 is not working with Spark 1.5.0 and CDH-5.5.2. and Python version 3.4. It is throwing an error : "AttributeError: 'generator' object has no attribute '_get_object_id"
  • FairyOnIce
    FairyOnIce over 7 years
    Both of your solutions are nice and neat. I am wondering why you did not use user defined functions for this?
  • Elsa Li
    Elsa Li almost 6 years
    @Paul I used the VERSION 2 method to add up several columns with "bigint" type. Somehow, I got this error: 'generator' object has no attribute '_get_object_id'. Do you know why this would happen? Thank you!
  • Paul
    Paul almost 6 years
    @QianLi No idea.
  • Paul
    Paul almost 6 years
    @QianLi try writing up a reproducible example and post a new question
  • Rahul Chawla
    Rahul Chawla almost 6 years
    @QianLi were you able to solve the error: 'generator' object has no attribute '_get_object_id' error?
  • Elsa Li
    Elsa Li almost 6 years
    @RahulChawla Unfortunately, this error problem not solved yet. Do you have any suggestions?
  • Rahul Chawla
    Rahul Chawla almost 6 years
    @QianLi still looking for it.
  • Augmented Jacob
    Augmented Jacob over 5 years
    Version 2 doesn't work. Throws a TypeError: 'Column' object is not callable
  • Vincent Chalmel
    Vincent Chalmel over 5 years
    Version 1 doesn't work for me, get an error "Column is not iterable"
  • Paul
    Paul over 5 years
    I suggest posting a new question if you are having trouble. Obviously there have been newer versions of spark since this was posted in 2015, back when it did solve the OPs problem.
  • Paul
    Paul over 5 years
    @VincentChalmel No errors in version 1, after iniitalizing pyspark.SparkContext and pyspark.sql.SparkSession to make sure .toDF() is available on RDD and also importing reduce from functools, This is with pyspark 2.3.1. To duplicate my jupyter+spark environment, use this docker command: docker run -it -p 8888:8888 jupyter/all-spark-notebook:latest
  • Paul
    Paul over 5 years
    @AugmentedJacob No idea why your'e getting that. Confirmed still works for me in 2.3.1. See comment above for docker command to duplicate environment.
  • absolutelydevastated
    absolutelydevastated over 5 years
    @Paul: Mine throws the error AssertionError: col should be Column (Spark 2.2)
  • Vamshidhar H.K.
    Vamshidhar H.K. about 5 years
    Thank you @Francesco. I got this same issue and resolved using 'del sum' as you mentioned..
  • Francesco Boi
    Francesco Boi about 5 years
    Glad I am not alone in this mistake