pandas multiprocessing apply

49,442

Solution 1

You can use https://github.com/nalepae/pandarallel, as in the following example:

from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

Solution 2

A more generic version based on the author solution, that allows to run it on every function and dataframe:

from multiprocessing import  Pool
from functools import partial
import numpy as np

def parallelize(data, func, num_of_processes=8):
    data_split = np.array_split(data, num_of_processes)
    pool = Pool(num_of_processes)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def run_on_subset(func, data_subset):
    return data_subset.apply(func, axis=1)

def parallelize_on_rows(data, func, num_of_processes=8):
    return parallelize(data, partial(run_on_subset, func), num_of_processes)

So the following line:

df.apply(some_func, axis=1)

Will become:

parallelize_on_rows(df, some_func) 

Solution 3

This is some code that I found useful. Automatically splits the dataframe into however many cpu cores you have.

import pandas as pd
import numpy as np
import multiprocessing as mp

def parallelize_dataframe(df, func):
    num_processes = mp.cpu_count()
    df_split = np.array_split(df, num_processes)
    with mp.Pool(num_processes) as p:
        df = pd.concat(p.map(func, df_split))
    return df

def parallelize_function(df):
    df[column_output] = df[column_input].apply(example_function)
    return df

def example_function(x):
    x = x*2
    return x

To run:

df_output = parallelize_dataframe(df, parallelize_function)

Solution 4

Since I don't have much of your data script, this is a guess, but I'd suggest using p.map instead of apply_async with the callback.

p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
    results.extend(result)

Solution 5

This worked well for me:

rows_iter = (row for _, row in df.iterrows())

with multiprocessing.Pool() as pool:
    df['new_column'] = pool.map(process_apply, rows_iter)
Share:
49,442
yemu
Author by

yemu

Updated on January 24, 2022

Comments

  • yemu
    yemu over 2 years

    I'm trying to use multiprocessing with pandas dataframe, that is split the dataframe to 8 parts. apply some function to each part using apply (with each part processed in different process).

    EDIT: Here's the solution I finally found:

    import multiprocessing as mp
    import pandas.util.testing as pdt
    
    def process_apply(x):
        # do some stuff to data here
    
    def process(df):
        res = df.apply(process_apply, axis=1)
        return res
    
    if __name__ == '__main__':
        p = mp.Pool(processes=8)
        split_dfs = np.array_split(big_df,8)
        pool_results = p.map(aoi_proc, split_dfs)
        p.close()
        p.join()
    
        # merging parts processed by different processes
        parts = pd.concat(pool_results, axis=0)
    
        # merging newly calculated parts to big_df
        big_df = pd.concat([big_df, parts], axis=1)
    
        # checking if the dfs were merged correctly
        pdt.assert_series_equal(parts['id'], big_df['id'])
    
  • yemu
    yemu over 9 years
    I had to put the call inside if name == 'main'. and with other small changes I managed to make it work, however I'm not sure if the result dataframes in pool results are returned in the same order as they were split. I have to check it.
  • ℕʘʘḆḽḘ
    ℕʘʘḆḽḘ almost 8 years
    see here for a solution with dask stackoverflow.com/questions/37979167/…
  • Alaa M.
    Alaa M. over 4 years
    What about some_func with parameters?
  • Tom Raz
    Tom Raz over 4 years
    @AlaaM. - you can use partial for that. docs.python.org/2/library/functools.html#functools.partial
  • curiouscupcake
    curiouscupcake over 4 years
    This answer should get more upvotes. The speed up is terrific.
  • frei
    frei over 4 years
    @TomRaz how do I use a partial in this case when normally I would do something like this? dataframe.apply(lambda row: process(row.attr1, row.attr2, ...))
  • Tom Raz
    Tom Raz over 4 years
    @frei - lambda functions cannot be used with multiprocessing, since they cannot be pickled. See more info here: stackoverflow.com/a/8805244/1781490 Can you use normal functions instead?
  • frei
    frei over 4 years
    i see ok. that's the piece i needed to know whether i should just refactor the whole method or not
  • SomeBruh
    SomeBruh about 4 years
    This solution works on linux &macOS natively. On Windows, Pandaral·lel will works only if the Python session is executed from Windows Subsystem for Linux (WSL).
  • Dcook
    Dcook almost 4 years
    @Tom Raz what if I don't use partial? What is the utility of using partial here?
  • Kailegh
    Kailegh over 3 years
    when I do this the numpy splits gets processed twice, any idea why this may be happening to me?
  • Gianmario Spacagna
    Gianmario Spacagna over 3 years
    great solution, just remember to not use lambda functions and to be careful with shadow variables. thanks
  • mah65
    mah65 about 3 years
    On windows, I get this error: ValueError: cannot find context for 'fork'