Apply a transformation to multiple columns pyspark dataframe

12,653

Found an answer on this Medium post.

First make a column for the total (as above), then use the * operator to unpack a list comprehension over the labels in select():

labels = ['color', 'feeling']
sample_df = sample_df.withColumn('total', sum([f.col(x) for x in labels]))

sample_df.select(
    'word', *[(f.col(col_name)/f.col('total')).alias(col_name) for col_name in labels]
).show()

The approach shown on the linked post shows how to generalize this for arbitrary transformations.

Share:
12,653
pault
Author by

pault

I'm not a programmer but I write code.

Updated on June 24, 2022

Comments

  • pault
    pault almost 2 years

    Suppose I have the following spark-dataframe:

    +-----+-------+
    | word|  label|
    +-----+-------+
    |  red|  color|
    |  red|  color|
    | blue|  color|
    | blue|feeling|
    |happy|feeling|
    +-----+-------+
    

    Which can be created using the following code:

    sample_df = spark.createDataFrame([
            ('red', 'color'),
            ('red', 'color'),
            ('blue', 'color'),
            ('blue', 'feeling'),
            ('happy', 'feeling')
        ],
        ('word', 'label')
    )
    

    I can perform a groupBy() to get the counts of each word-label pair:

    sample_df = sample_df.groupBy('word', 'label').count()
    #+-----+-------+-----+
    #| word|  label|count|
    #+-----+-------+-----+
    #| blue|  color|    1|
    #| blue|feeling|    1|
    #|  red|  color|    2|
    #|happy|feeling|    1|
    #+-----+-------+-----+
    

    And then pivot() and sum() to get the label counts as columns:

    import pyspark.sql.functions as f
    sample_df = sample_df.groupBy('word').pivot('label').agg(f.sum('count')).na.fill(0)
    #+-----+-----+-------+
    #| word|color|feeling|
    #+-----+-----+-------+
    #|  red|    2|      0|
    #|happy|    0|      1|
    #| blue|    1|      1|
    #+-----+-----+-------+
    

    What is the best way to transform this dataframe such that each row is divided by the total for that row?

    # Desired output
    +-----+-----+-------+
    | word|color|feeling|
    +-----+-----+-------+
    |  red|  1.0|    0.0|
    |happy|  0.0|    1.0|
    | blue|  0.5|    0.5|
    +-----+-----+-------+
    

    One way to achieve this result is to use __builtin__.sum (NOT pyspark.sql.functions.sum) to get the row-wise sum and then call withColumn() for each label:

    labels = ['color', 'feeling']
    
    sample_df.withColumn('total', sum([f.col(x) for x in labels]))\
        .withColumn('color', f.col('color')/f.col('total'))\
        .withColumn('feeling', f.col('feeling')/f.col('total'))\
        .select('word', 'color', 'feeling')\
        .show()
    

    But there has to be a better way than enumerating each of the possible columns.

    More generally, my question is:

    How can I apply an arbitrary transformation, that is a function of the current row, to multiple columns simultaneously?