passing kwargs with multiprocessing.pool.map

16,128

Solution 1

If you want to iterate over the other arguments, use @ArcturusB's answer.

If you just want to pass them, having the same value for each iteration, then you can do this:

from functools import partial
pool.map(partial(worker, **kwargs), jobs)

Partial 'binds' arguments to a function. Old versions of Python cannot serialize partial objects though.

Solution 2

The multiprocessing.pool.Pool.map doc states:

A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.

We can only pass one iterable argument. End of the story. But we can luckilly think of a workaround: define worker_wrapper function that takes a single argument, unpacks it to args and kwargs, and passes them to worker:

def worker_wrapper(arg):
    args, kwargs = arg
    return worker(*args, **kwargs)

In your wrapper_process, you need to construct this single argument from jobs (or even directly when constructing jobs) and call worker_wrapper:

arg = [(j, kwargs) for j in jobs]
pool.map(worker_wrapper, arg)

Here is a working implementation, kept as close as possible to your original code:

import multiprocessing as mp

def worker_wrapper(arg):
    args, kwargs = arg
    return worker(*args, **kwargs)

def worker(x, y, **kwargs):
    kwarg_test = kwargs.get('kwarg_test', False)
    # print("kwarg_test = {}".format(kwarg_test))     
    if kwarg_test:
        print("Success")
    else:
        print("Fail")
    return x*y

def wrapper_process(**kwargs):
    jobs = []
    pool=mp.Pool(4)
    for i, n in enumerate(range(4)):
        jobs.append((n,i))
    arg = [(j, kwargs) for j in jobs]
    pool.map(worker_wrapper, arg)

def main(**kwargs):
    print("=> calling `worker`")
    worker(1, 2,kwarg_test=True) #accepts kwargs
    print("=> no kwargs")
    wrapper_process() # no kwargs
    print("=> with `kwar_test=True`")
    wrapper_process(kwarg_test=True)

if __name__ == "__main__":    
    main()

Which passes the test:

=> calling `worker`
Success
=> no kwargs
Fail
Fail
Fail
Fail
=> with `kwar_test=True`
Success
Success
Success
Success

Solution 3

You don't need to force yourself to use map. Just use apply_async and pass in your parameters as a dictionary. In this example batch_parameters is a list of dictionaries which contain the parameters you want to test. future_parameters keeps a list of tuples of futures and the parameters used to get those futures. In the loop that follows, we wait for the futures to get their results and print the results together with the parameters that were used to generate them.

with Pool(parallelism) as pool:
    future_parameters = [(pool.apply_async(f, kwds=parameters), parameters) for parameters in batch_parameters]
    for future, parameters in future_parameters:
        result = future.get()
        print(parameters, "=>", result)
Share:
16,128
Admin
Author by

Admin

Updated on June 14, 2022

Comments

  • Admin
    Admin about 2 years

    I would like to pass keyword arguments to my worker-function with Pool.map(). I can't find a clear example of this when searching forums.

    Example Code:

    import multiprocessing as mp
    
    def worker((x,y), **kwargs):
        kwarg_test = kwargs.get('kwarg_test', False)
        print("kwarg_test = {}".format(kwarg_test))     
        if kwarg_test:
            print("Success")
        return x*y
    
    def wrapper_process(**kwargs):
        jobs = []
        pool=mp.Pool(4)
        for i, n in enumerate(range(4)):
            jobs.append((n,i))
        pool.map(worker, jobs) #works
        pool.map(worker, jobs, kwargs) #how to do this?   
    
    def main(**kwargs):
        worker((1,2),kwarg_test=True) #accepts kwargs
        wrapper_process(kwarg_test=True)
    
    if __name__ == "__main__":    
        main()
    

    Output:

    kwarg_test = True
    Success
    kwarg_test = False
    kwarg_test = False
    kwarg_test = False
    kwarg_test = False
    TypeError: unsupported operand type(s) for //: 'int' and 'dict'
    

    The type error has to do with parsing arguments inside of multiprocessing.Pool or Queue, and I have tried several other syntaxes, like making a list of the kwargs; [kwargs, kwargs, kwargs, kwargs], as well as several attempts to include the kwarg in the jobs list but no luck. I traced the code in multiprocessing.pool from map to map_async and got as far as task_batches = Pool._get_tasks(func, iterable, chunksize) in pool.py when I encountered the generator structure. I'm happy to learn more about this in future but for now I am just trying to find out:

    Is there a simple syntax for allowing the passing of kwargs with pool.map?