pandas multiprocessing apply
49,442
Solution 1
You can use https://github.com/nalepae/pandarallel, as in the following example:
from pandarallel import pandarallel
from math import sin
pandarallel.initialize()
def func(x):
return sin(x**2)
df.parallel_apply(func, axis=1)
Solution 2
A more generic version based on the author solution, that allows to run it on every function and dataframe:
from multiprocessing import Pool
from functools import partial
import numpy as np
def parallelize(data, func, num_of_processes=8):
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
def run_on_subset(func, data_subset):
return data_subset.apply(func, axis=1)
def parallelize_on_rows(data, func, num_of_processes=8):
return parallelize(data, partial(run_on_subset, func), num_of_processes)
So the following line:
df.apply(some_func, axis=1)
Will become:
parallelize_on_rows(df, some_func)
Solution 3
This is some code that I found useful. Automatically splits the dataframe into however many cpu cores you have.
import pandas as pd
import numpy as np
import multiprocessing as mp
def parallelize_dataframe(df, func):
num_processes = mp.cpu_count()
df_split = np.array_split(df, num_processes)
with mp.Pool(num_processes) as p:
df = pd.concat(p.map(func, df_split))
return df
def parallelize_function(df):
df[column_output] = df[column_input].apply(example_function)
return df
def example_function(x):
x = x*2
return x
To run:
df_output = parallelize_dataframe(df, parallelize_function)
Solution 4
Since I don't have much of your data script, this is a guess, but I'd suggest using p.map
instead of apply_async
with the callback.
p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
results.extend(result)
Solution 5
This worked well for me:
rows_iter = (row for _, row in df.iterrows())
with multiprocessing.Pool() as pool:
df['new_column'] = pool.map(process_apply, rows_iter)
Author by
yemu
Updated on January 24, 2022Comments
-
yemu over 2 years
I'm trying to use multiprocessing with pandas dataframe, that is split the dataframe to 8 parts. apply some function to each part using apply (with each part processed in different process).
EDIT: Here's the solution I finally found:
import multiprocessing as mp import pandas.util.testing as pdt def process_apply(x): # do some stuff to data here def process(df): res = df.apply(process_apply, axis=1) return res if __name__ == '__main__': p = mp.Pool(processes=8) split_dfs = np.array_split(big_df,8) pool_results = p.map(aoi_proc, split_dfs) p.close() p.join() # merging parts processed by different processes parts = pd.concat(pool_results, axis=0) # merging newly calculated parts to big_df big_df = pd.concat([big_df, parts], axis=1) # checking if the dfs were merged correctly pdt.assert_series_equal(parts['id'], big_df['id'])
-
yemu over 9 yearsI had to put the call inside if name == 'main'. and with other small changes I managed to make it work, however I'm not sure if the result dataframes in pool results are returned in the same order as they were split. I have to check it.
-
ℕʘʘḆḽḘ almost 8 yearssee here for a solution with
dask
stackoverflow.com/questions/37979167/… -
Alaa M. over 4 yearsWhat about
some_func
with parameters? -
Tom Raz over 4 years@AlaaM. - you can use partial for that. docs.python.org/2/library/functools.html#functools.partial
-
curiouscupcake over 4 yearsThis answer should get more upvotes. The speed up is terrific.
-
frei over 4 years@TomRaz how do I use a partial in this case when normally I would do something like this?
dataframe.apply(lambda row: process(row.attr1, row.attr2, ...))
-
Tom Raz over 4 years@frei - lambda functions cannot be used with multiprocessing, since they cannot be pickled. See more info here: stackoverflow.com/a/8805244/1781490 Can you use normal functions instead?
-
frei over 4 yearsi see ok. that's the piece i needed to know whether i should just refactor the whole method or not
-
SomeBruh about 4 yearsThis solution works on linux &macOS natively. On Windows, Pandaral·lel will works only if the Python session is executed from Windows Subsystem for Linux (WSL).
-
Dcook almost 4 years@Tom Raz what if I don't use partial? What is the utility of using partial here?
-
Kailegh over 3 yearswhen I do this the numpy splits gets processed twice, any idea why this may be happening to me?
-
Gianmario Spacagna over 3 yearsgreat solution, just remember to not use lambda functions and to be careful with shadow variables. thanks
-
mah65 about 3 yearsOn windows, I get this error:
ValueError: cannot find context for 'fork'