Divide Pyspark Dataframe Column by Column in other Pyspark Dataframe when ID Matches
41,935
Solution 1
Code:
import pyspark.sql.functions as F
df1 = df1\
.join(df2, "CustomerID")\
.withColumn("NormalizedCustomerValue", (F.col("CustomerValue") / F.col("CustomerValueSum")))\
.drop("CustomerValueSum")
Output:
df1.show()
+----------+-------------+-----------------------+
|CustomerID|CustomerValue|NormalizedCustomerValue|
+----------+-------------+-----------------------+
| 17| 0.5| 0.5813953488372093|
| 17| 0.01| 0.011627906976744186|
| 17| 0.35| 0.4069767441860465|
| 12| 0.17| 1.0|
| 14| 0.15| 0.37499999999999994|
| 14| 0.25| 0.625|
+----------+-------------+-----------------------+
Solution 2
This can also be achieved using Spark Window function where you need not create separate dataframe with the aggregated values (df2):
Creating the data for the input dataframe:
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
data =[(12, 0.17), (14, 0.15), (14, 0.25), (17, 0.5), (17, 0.01), (17, 0.35)]
df1 = sqlContext.createDataFrame(data, ['CustomerID', 'CustomerValue'])
df1.show()
+----------+-------------+
|CustomerID|CustomerValue|
+----------+-------------+
| 12| 0.17|
| 14| 0.15|
| 14| 0.25|
| 17| 0.5|
| 17| 0.01|
| 17| 0.35|
+----------+-------------+
Defining a Window partitioned by CustomerID:
from pyspark.sql import Window
from pyspark.sql.functions import sum
w = Window.partitionBy('CustomerID')
df2 = df1.withColumn('NormalizedCustomerValue', df1.CustomerValue/sum(df1.CustomerValue).over(w)).orderBy('CustomerID')
df2.show()
+----------+-------------+-----------------------+
|CustomerID|CustomerValue|NormalizedCustomerValue|
+----------+-------------+-----------------------+
| 12| 0.17| 1.0|
| 14| 0.15| 0.37499999999999994|
| 14| 0.25| 0.625|
| 17| 0.5| 0.5813953488372093|
| 17| 0.01| 0.011627906976744186|
| 17| 0.35| 0.4069767441860465|
+----------+-------------+-----------------------+
Author by
TrentWoodbury
Updated on April 08, 2020Comments
-
TrentWoodbury about 4 years
I have a PySpark DataFrame, df1, that looks like:
CustomerID CustomerValue 12 .17 14 .15 14 .25 17 .50 17 .01 17 .35
I have a second PySpark DataFrame, df2, that is df1 grouped by CustomerID and aggregated by the sum function. It looks like this:
CustomerID CustomerValueSum 12 .17 14 .40 17 .86
I want to add a third column to df1 that is df1['CustomerValue'] divided by df2['CustomerValueSum'] for the same CustomerIDs. This would look like:
CustomerID CustomerValue NormalizedCustomerValue 12 .17 1.00 14 .15 .38 14 .25 .62 17 .50 .58 17 .01 .01 17 .35 .41
In other words, I'm trying to convert this Python/Pandas code to PySpark:
normalized_list = [] for idx, row in df1.iterrows(): ( normalized_list .append( row.CustomerValue / df2[df2.CustomerID == row.CustomerID].CustomerValueSum ) ) df1['NormalizedCustomerValue'] = [val.values[0] for val in normalized_list]
How can I do this?