Mulitprocess Pools with different functions

24,723

Solution 1

To pass different functions, you can simply call map_async multiple times.

Here is an example to illustrate that,

from multiprocessing import Pool
from time import sleep

def square(x):
    return x * x

def cube(y):
    return y * y * y

pool = Pool(processes=20)

result_squares = pool.map_async(f, range(10))
result_cubes = pool.map_async(g, range(10))

The result will be:

>>> print result_squares.get(timeout=1)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

>>> print result_cubes.get(timeout=1)
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

Solution 2

You can use map or some lambda function (edit: actually you can't use a lambda function). You can use a simple map function:

def smap(f, *args):
    return f(*args)

pool = multiprocessing.Pool(processes=30)
res=pool.map(smap, function_list, args_list1, args_list2,...)

The normal map function takes iterables as inputs, which is inconvenient.

Solution 3

They will not run in parallel. See following code:

def updater1(q,i):    
    print "UPDATER 1:", i
    return

def updater2(q,i):    
    print "UPDATER2:", i
    return

if __name__=='__main__':
    a = range(10)
    b=["abc","def","ghi","jkl","mno","pqr","vas","dqfq","grea","qfwqa","qwfsa","qdqs"]


    pool = multiprocessing.Pool()

    func1 = partial(updater1,q)
    func2 = partial(updater2,q)
    pool.map_async(func1, a)
    pool.map_async(func2, b)

    pool.close()
    pool.join()

The above code yields the following printout:

UPDATER 1: 1
UPDATER 1: 0
UPDATER 1: 2
UPDATER 1: 3
UPDATER 1: 4
UPDATER 1: 5
UPDATER 1: 6
UPDATER 1: 7
UPDATER 1: 8
UPDATER 1: 9
UPDATER2: abc
UPDATER2: def
UPDATER2: ghi
UPDATER2: jkl
UPDATER2: mno
UPDATER2: pqr
UPDATER2: vas
UPDATER2: dqfq
UPDATER2: grea
UPDATER2: qfwqa
UPDATER2: qwfsa
UPDATER2: qdqs

Solution 4

Here is a working example of the idea shared by @Rayamon:

import functools

from multiprocessing import Pool


def a(param1, param2, param3):
    return param1 + param2 + param3


def b(param1, param2):
    return param1 + param2


def smap(f):
    return f()


func1 = functools.partial(a, 1, 2, 3)
func2 = functools.partial(b, 1, 2)

pool = Pool(processes=2)
res = pool.map(smap, [func1, func2])
pool.close()
pool.join()
print(res)

Solution 5

Multiple Functions in one Pool

The following example shows how to run the three functions inc, dec, and add in a pool.

from multiprocessing import Pool
import functools

# -------------------------------------

def inc(x):
    return x + 1

def dec(x):
    return x - 1

def add(x, y):
    return x + y

# -------------------------------------

def smap(f):
    return f()

def main():
    f_inc = functools.partial(inc, 4)
    f_dec = functools.partial(dec, 2)
    f_add = functools.partial(add, 3, 4)
    with Pool() as pool:
        res = pool.map(smap, [f_inc, f_dec, f_add])
        print(res)

# -------------------------------------

if __name__ == '__main__':
    main()

We have three functions, which are run independently in a pool. We use the functools.partial to prepare the functions and their parameters before they are executed.

Source: https://zetcode.com/python/multiprocessing/

Share:
24,723
dorvak
Author by

dorvak

Updated on January 13, 2022

Comments

  • dorvak
    dorvak over 2 years

    Most examples of the Multiprocess Worker Pools execute a single function in different processes, f.e.

    def foo(args):
       pass
    
    if __name__ == '__main__':
       pool = multiprocessing.Pool(processes=30)
       res=pool.map_async(foo,args)
    

    Is there a way to handle two different and independent functions within the pool? So that you could assign f.e. 15 processes for foo() and 15 processes for bar() or is a pool bounded to a single function? Or du you have to create different processes for different functions manually with

     p = Process(target=foo, args=(whatever,))
     q = Process(target=bar, args=(whatever,))
     q.start()
     p.start()
    

    and forget about the worker pool?

  • dorvak
    dorvak almost 13 years
    And will they be executed parallel or "in a row"?
  • Ocaj Nires
    Ocaj Nires almost 13 years
    The map_async returns immediately. As long as there are enough free processes in the pool, new tasks will be run without having to wait. In the example above, they will run parallel. @mad_scientist
  • dorvak
    dorvak almost 13 years
    Thx!But there is no way to assign a specific amount of workers/processes, i guess?
  • Ocaj Nires
    Ocaj Nires almost 13 years
    The multiprocessing Pool API does not provide a mechanism to assign specific amount of workers within the same pool. If you really want specific amount of workers per task, create different pools. Though having only a single pool is recommended. I guess it makes sense that the Pool should manage that for you transparently without you worrying about it.
  • Zhubarb
    Zhubarb over 4 years
    Thanks for your answer, are you positive adding map_async() one after the other will run in parallel. I have actually tried this and as the answer by @Sam indicates, these seem to be running sequentially.
  • ARA1307
    ARA1307 over 4 years
    This should be accepted as the right answer, because the accepted answer runs in a quasi-parallel mode (with an awful planner).
  • Madan Raj
    Madan Raj almost 3 years
    How do i pass a list of values as argument and it works individually in threads.. In case of single function it works fine but not in case of multiple functions..