Divide Pyspark Dataframe Column by Column in other Pyspark Dataframe when ID Matches

41,935

Solution 1

Code:

import pyspark.sql.functions as F

df1 = df1\
    .join(df2, "CustomerID")\
    .withColumn("NormalizedCustomerValue", (F.col("CustomerValue") / F.col("CustomerValueSum")))\
    .drop("CustomerValueSum")

Output:

df1.show()

+----------+-------------+-----------------------+
|CustomerID|CustomerValue|NormalizedCustomerValue|
+----------+-------------+-----------------------+
|        17|          0.5|     0.5813953488372093|
|        17|         0.01|   0.011627906976744186|
|        17|         0.35|     0.4069767441860465|
|        12|         0.17|                    1.0|
|        14|         0.15|    0.37499999999999994|
|        14|         0.25|                  0.625|
+----------+-------------+-----------------------+

Solution 2

This can also be achieved using Spark Window function where you need not create separate dataframe with the aggregated values (df2):

Creating the data for the input dataframe:

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

data =[(12, 0.17), (14, 0.15), (14, 0.25), (17, 0.5), (17, 0.01), (17, 0.35)]
df1 = sqlContext.createDataFrame(data, ['CustomerID', 'CustomerValue'])
df1.show()
+----------+-------------+
|CustomerID|CustomerValue|
+----------+-------------+
|        12|         0.17|
|        14|         0.15|
|        14|         0.25|
|        17|          0.5|
|        17|         0.01|
|        17|         0.35|
+----------+-------------+

Defining a Window partitioned by CustomerID:

from pyspark.sql import Window
from pyspark.sql.functions import sum

w = Window.partitionBy('CustomerID')

df2 = df1.withColumn('NormalizedCustomerValue', df1.CustomerValue/sum(df1.CustomerValue).over(w)).orderBy('CustomerID')

df2.show()
+----------+-------------+-----------------------+
|CustomerID|CustomerValue|NormalizedCustomerValue|
+----------+-------------+-----------------------+
|        12|         0.17|                    1.0|
|        14|         0.15|    0.37499999999999994|
|        14|         0.25|                  0.625|
|        17|          0.5|     0.5813953488372093|
|        17|         0.01|   0.011627906976744186|
|        17|         0.35|     0.4069767441860465|
+----------+-------------+-----------------------+
Share:
41,935
TrentWoodbury
Author by

TrentWoodbury

Updated on April 08, 2020

Comments

  • TrentWoodbury
    TrentWoodbury about 4 years

    I have a PySpark DataFrame, df1, that looks like:

    CustomerID  CustomerValue
    12          .17
    14          .15
    14          .25
    17          .50
    17          .01
    17          .35
    

    I have a second PySpark DataFrame, df2, that is df1 grouped by CustomerID and aggregated by the sum function. It looks like this:

     CustomerID  CustomerValueSum
     12          .17
     14          .40
     17          .86
    

    I want to add a third column to df1 that is df1['CustomerValue'] divided by df2['CustomerValueSum'] for the same CustomerIDs. This would look like:

    CustomerID  CustomerValue  NormalizedCustomerValue
    12          .17            1.00
    14          .15            .38
    14          .25            .62
    17          .50            .58
    17          .01            .01
    17          .35            .41
    

    In other words, I'm trying to convert this Python/Pandas code to PySpark:

    normalized_list = []
    for idx, row in df1.iterrows():
        (
            normalized_list
            .append(
                row.CustomerValue / df2[df2.CustomerID == row.CustomerID].CustomerValueSum
            )
        )
    df1['NormalizedCustomerValue'] = [val.values[0] for val in normalized_list]
    

    How can I do this?