Iterating each row of Data Frame using pySpark

13,316

Instead of using df_1 = df.map(funcRowIter).collect() you should try UDF. Hope this will help.

from pyspark.sql.functions import struct
from pyspark.sql.functions import *
def funcRowIter(rows):
    print type(rows)
    if(row is nor None and row.id is not None)
        if(rows.id == "1"):
            return 1
A = udf(funcRowIter, ArrayType(StringType()))
z = df.withColumn(data_id, A(struct([df[x] for x in df.columns])))
z.show()

collect() will never be the good options for very big data i.e millions of records

Share:
13,316
Ashay Dhavale
Author by

Ashay Dhavale

Updated on June 04, 2022

Comments

  • Ashay Dhavale
    Ashay Dhavale almost 2 years

    I need to iterate over a dataframe using pySpark just like we can iterate a set of values using for loop. Below is the code I have written. The problem with this code is

    1. I have to use collect which breaks the parallelism
    2. I am not able to print any values from the DataFrame in the function funcRowIter
    3. I cannot break the loop once I have the match found.

    I have to do it in pySpark and cannot use pandas for this :

    from pyspark.sql.functions import *
    from pyspark.sql import HiveContext
    from pyspark.sql import functions
    from pyspark.sql import DataFrameWriter
    from pyspark.sql.readwriter import DataFrameWriter
    from pyspark import SparkContext
    
    sc = SparkContext()
    hive_context = HiveContext(sc)
    
    tab = hive_context.sql("select * from update_poc.test_table_a")
    
    tab.registerTempTable("tab")
    print type(tab)
    
    df = tab.rdd
    
    def funcRowIter(rows):
        print type(rows)
            if(rows.id == "1"):
                return 1
    
    df_1 = df.map(funcRowIter).collect()
    print df_1