passing kwargs with multiprocessing.pool.map
Solution 1
If you want to iterate over the other arguments, use @ArcturusB's answer.
If you just want to pass them, having the same value for each iteration, then you can do this:
from functools import partial
pool.map(partial(worker, **kwargs), jobs)
Partial 'binds' arguments to a function. Old versions of Python cannot serialize partial objects though.
Solution 2
The multiprocessing.pool.Pool.map
doc states:
A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.
We can only pass one iterable argument. End of the story. But we can luckilly think of a workaround: define worker_wrapper
function that takes a single argument, unpacks it to args and kwargs, and passes them to worker
:
def worker_wrapper(arg):
args, kwargs = arg
return worker(*args, **kwargs)
In your wrapper_process
, you need to construct this single argument from
jobs
(or even directly when constructing jobs) and call worker_wrapper
:
arg = [(j, kwargs) for j in jobs]
pool.map(worker_wrapper, arg)
Here is a working implementation, kept as close as possible to your original code:
import multiprocessing as mp
def worker_wrapper(arg):
args, kwargs = arg
return worker(*args, **kwargs)
def worker(x, y, **kwargs):
kwarg_test = kwargs.get('kwarg_test', False)
# print("kwarg_test = {}".format(kwarg_test))
if kwarg_test:
print("Success")
else:
print("Fail")
return x*y
def wrapper_process(**kwargs):
jobs = []
pool=mp.Pool(4)
for i, n in enumerate(range(4)):
jobs.append((n,i))
arg = [(j, kwargs) for j in jobs]
pool.map(worker_wrapper, arg)
def main(**kwargs):
print("=> calling `worker`")
worker(1, 2,kwarg_test=True) #accepts kwargs
print("=> no kwargs")
wrapper_process() # no kwargs
print("=> with `kwar_test=True`")
wrapper_process(kwarg_test=True)
if __name__ == "__main__":
main()
Which passes the test:
=> calling `worker`
Success
=> no kwargs
Fail
Fail
Fail
Fail
=> with `kwar_test=True`
Success
Success
Success
Success
Solution 3
You don't need to force yourself to use map. Just use apply_async and pass in your parameters as a dictionary. In this example batch_parameters
is a list of dictionaries which contain the parameters you want to test. future_parameters
keeps a list of tuples of futures and the parameters used to get those futures. In the loop that follows, we wait for the futures to get their results and print the results together with the parameters that were used to generate them.
with Pool(parallelism) as pool:
future_parameters = [(pool.apply_async(f, kwds=parameters), parameters) for parameters in batch_parameters]
for future, parameters in future_parameters:
result = future.get()
print(parameters, "=>", result)
![Admin](/assets/logo_square_200-5d0d61d6853298bd2a4fe063103715b4daf2819fc21225efa21dfb93e61952ea.png)
Admin
Updated on June 14, 2022Comments
-
Admin about 2 years
I would like to pass keyword arguments to my worker-function with Pool.map(). I can't find a clear example of this when searching forums.
Example Code:
import multiprocessing as mp def worker((x,y), **kwargs): kwarg_test = kwargs.get('kwarg_test', False) print("kwarg_test = {}".format(kwarg_test)) if kwarg_test: print("Success") return x*y def wrapper_process(**kwargs): jobs = [] pool=mp.Pool(4) for i, n in enumerate(range(4)): jobs.append((n,i)) pool.map(worker, jobs) #works pool.map(worker, jobs, kwargs) #how to do this? def main(**kwargs): worker((1,2),kwarg_test=True) #accepts kwargs wrapper_process(kwarg_test=True) if __name__ == "__main__": main()
Output:
kwarg_test = True Success kwarg_test = False kwarg_test = False kwarg_test = False kwarg_test = False TypeError: unsupported operand type(s) for //: 'int' and 'dict'
The type error has to do with parsing arguments inside of multiprocessing.Pool or Queue, and I have tried several other syntaxes, like making a list of the kwargs; [kwargs, kwargs, kwargs, kwargs], as well as several attempts to include the kwarg in the jobs list but no luck. I traced the code in multiprocessing.pool from map to map_async and got as far as
task_batches = Pool._get_tasks(func, iterable, chunksize)
in pool.py when I encountered the generator structure. I'm happy to learn more about this in future but for now I am just trying to find out:Is there a simple syntax for allowing the passing of kwargs with pool.map?