multi-processing with spark(PySpark)

10,289

Indeed, as @user6910411 commented, when I changed the Pool to be threadPool (multiprocessing.pool.ThreadPool package), everything worked as expected and these errors were gone.

The root reasons for the errors themselves are also clear now, if you want me to share them, please comment below.

Share:
10,289

Related videos on Youtube

Mike
Author by

Mike

Updated on June 04, 2022

Comments

  • Mike
    Mike almost 2 years

    The usecase is the following:

    I have a large dataframe, with a 'user_id' column in it (every user_id can appear in many rows). I have a list of users my_users which I need to analyse.

    Groupby, filter and aggregate could be a good idea, but the available aggregation functions included in pyspark did not fit my needs. In the pyspark ver, user defined aggregation functions are still not fully supported and I decided to leave it for now..

    Instead, I simply iterate the my_users list, filter each user in the dataframe, and analyse. In order to optimize this procedure, I decided to use python multiprocessing pool, for each user in my_users

    The function that does the analysis (and passed to the pool) takes two arguments: the user_id, and a path to the main dataframe, on which I perform all the computations (PARQUET format). In the method, I load the dataframe, and work on it (DataFrame can't be passed as an argument itself)

    I get all sorts of weird errors, on some of the processes (different in each run), that look like:

    • PythonUtils does not exist in the JVM (when reading the 'parquet' dataframe)

    print screen of the error message

    • KeyError: 'c' not found (also, when reading the 'parquet' dataframe. What is 'c' anyway??)

    When I run it without any multiprocessing, everything runs smooth, but slow..

    Any ideas where these errors are coming from?

    I'll put some code sample just to make things clearer:

    PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant
    
    # ....
    
    def users_worker(df_path, user_id):
        df = spark.read.parquet(df_path) # The problem is here!
        ## the analysis of user_id in df is here
    
    def user_worker_wrapper(args):
        users_worker(*args)
    
    def analyse():
        # ...
        users_worker_args = [(df_path, user_id) for user_id in my_users]
        users_pool = Pool(processes=len(my_users))
        users_pool.map(users_worker_wrapper, users_worker_args)
        users_pool.close()
        users_pool.join()
    
  • Asish M.
    Asish M. over 6 years
    Would like to know the reasons for the errors. We recently got access to a clustered environment courtesy of another team in the company and was thinking along the same lines, but would like to learn about pitfalls as I keep learning the pyspark framework
  • Mike
    Mike over 6 years
    Hi @AsishM. Spark offers "Scheduling Within an Application" option, which you can find more detailed on link. Your usecase involves cluster configuration, so the considerations are bit different.