Multiprocessing pool 'apply_async' only seems to call function once

24,192

Solution 1

apply_async isn't meant to launch multiple processes; it's just meant to call the function with the arguments in one of the processes of the pool. You'll need to make 10 calls if you want the function to be called 10 times.

First, note the docs on apply() (emphasis added):

apply(func[, args[, kwds]])

Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

Now, in the docs for apply_async():

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

A variant of the apply() method which returns a result object.

The difference between the two is just that apply_async returns immediately. You can use map() to call a function multiple times, though if you're calling with the same inputs, then it's a little redudant to create the list of the same argument just to have a sequence of the right length.

However, if you're calling different functions with the same input, then you're really just calling a higher order function, and you could do it with map or map_async() like this:

multiprocessing.map(lambda f: f(1), functions)

except that lambda functions aren't pickleable, so you'd need to use a defined function (see How to let Pool.map take a lambda function). You can actually use the builtin apply() (not the multiprocessing one) (although it's deprecated):

multiprocessing.map(apply,[(f,1) for f in functions])

It's easy enough to write your own, too:

def apply_(f,*args,**kwargs):
  return f(*args,**kwargs)

multiprocessing.map(apply_,[(f,1) for f in functions])

Solution 2

Each time you write pool.apply_async(...) it will delegate that function call to one of the processes that was started in the pool. If you want to call the function in multiple processes, you need to issue multiple pool.apply_async calls.

Note, there also exists a pool.map (and pool.map_async) function which will take a function and an iterable of inputs:

inputs = range(30)
results = pool.map(f, inputs)

These functions will apply the function to each input in the inputs iterable. It attempts to put "batches" into the pool so that the load gets balanced fairly evenly among all the processes in the pool.

Solution 3

If you want to run a single piece of code in ten processes, each of which then exits, a Pool of ten processes is probably not the right thing to use.

Instead, create ten Processes to run the code:

processes = []

for _ in range(10):
    p = multiprocessing.Process(target=f, args=(1,))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

The multiprocessing.Pool class is designed to handle situations where the number of processes and the number of jobs are unrelated. Often the number of processes is selected to be the number of CPU cores you have, while the number of jobs is much larger. Thanks!

Share:
24,192
Juicy
Author by

Juicy

Updated on September 22, 2020

Comments

  • Juicy
    Juicy over 3 years

    I've been following the docs to try to understand multiprocessing pools. I came up with this:

    import time
    from multiprocessing import Pool
    
    def f(a):
        print 'f(' + str(a) + ')'
        return True
    
    t = time.time()
    pool = Pool(processes=10)
    result = pool.apply_async(f, (1,))
    print result.get()
    pool.close()
    print ' [i] Time elapsed ' + str(time.time() - t)
    

    I'm trying to use 10 processes to evaluate the function f(a). I've put a print statement in f.

    This is the output I'm getting:

    $ python pooltest.py 
    f(1)
    True
     [i] Time elapsed 0.0270888805389
    

    It appears to me that the function f is only getting evaluated once.

    I'm likely not using the right method but the end result I'm looking for is to run f with 10 processes simultaneously, and get the result returned by each one of those process. So I would end with a list of 10 results (which may or may not be identical).

    The docs on multiprocessing are quite confusing and it's not trivial to figure out which approach I should be taking and it seems to me that f should be run 10 times in the example I provided above.

  • mgilson
    mgilson about 9 years
    Careful, I don't think that lambda works as lambda functions aren't pickleable (IIRC).
  • Juicy
    Juicy about 9 years
    I understand how apply works now and it makes since, but you were right the first time. I'm trying to run 10 times to same function with the same argument (although the result may not always be the same for other reasons). I would like to end up with a list of the 10 results. Is the best way to do this with a for loop then?
  • Joshua Taylor
    Joshua Taylor about 9 years
    @Juicy I think map is what you'd want
  • Juicy
    Juicy about 9 years
    But map seems to only take an iterable, I guess I should just make an iterable list with 10 times the same argument then.
  • Joshua Taylor
    Joshua Taylor about 9 years
    @Juicy See my update; you have a list of functions; that's the iterable that you should pass in. The function that you're mapping should call its argument with the arglist
  • tdelaney
    tdelaney about 9 years
    OP wants a returned result which your example doesn't provide.
  • Joshua Taylor
    Joshua Taylor about 9 years
    @mgilson Right; I've updated to say that it'd be nice to use, but doesn't actually work. Using a defined function should, though.