Difference between apply() and apply_async() in Python multiprocessing module

10,615

You have pointed to a Python2.7 documentation so I'm going to base my answers on Python2.7 multiprocessing implementation. It might differ on Python3.X but should not be very different.

Difference between apply and apply_async

Difference between these two is really self describing when you view how those are actually implemented underneath. Here I'm going to copy/paste code from multiprocessing/pool.py for bot functions.

def apply(self, func, args=(), kwds={}):
    '''
    Equivalent of `apply()` builtin
    '''
    assert self._state == RUN
    return self.apply_async(func, args, kwds).get()

As you can see, apply is actually calling apply_async but just before returning result, get is called. This basically makes apply_async block until result is returned.

def apply_async(self, func, args=(), kwds={}, callback=None):
    '''
    Asynchronous equivalent of `apply()` builtin
    '''
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

apply_async enqueues task in task queue, and returns a handle of submitted task. With that handle you can call get or wait to get the results or wait for task to finish, respectively. After task is finished, what it returns gets passed as an argument to callback function.

Example:

from multiprocessing import Pool
from time import sleep


def callback(a):
    print a


def worker(i, n):
    print 'Entering worker ', i
    sleep(n)
    print 'Exiting worker'
    return 'worker_response'


if __name__ == '__main__':
    pool = Pool(4)
    a = [pool.apply_async(worker, (i, 4), callback=callback) for i in range(8)]
    for i in a:
        i.wait()

Results:

Entering worker  0
Entering worker  1
Entering worker  2
Entering worker  3
Exiting worker
Exiting worker
Exiting worker
Exiting worker
Entering worker  4
Entering worker  5
worker_response
Entering worker  6
worker_response
Entering worker  7
worker_response
worker_response
Exiting worker
Exiting worker
Exiting worker
Exiting worker
worker_response
worker_response
worker_response
worker_response

Pay attention that, when using apply_async, you have to wait for results or wait for tasks to finish. If you do not i.e. comment last 2 lines of my example, your script will immediately finish after you run it.

Why apply_async may use more processes

I understand this in regard to how apply is described and working. Since apply runs task by task sending it to available process in a Pool, apply_async adds tasks to queue and then tasks queue thread sends them to available processes in Pool. This is why more than one process may run when you use apply_async.

I went through this section couple of times to better understand the idea that author tried to convey. Lets check it here:

# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1)              # prints the PID of that process

# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]

If we try to understand last example by looking at previous one, when you have multiple successive calls of apply_async it certainly may run more of them at the same time. This probably depends on how many processes in the Pool is used at that moment. That is why they say may.

Share:
10,615
Pietro Marchesi
Author by

Pietro Marchesi

Updated on July 04, 2022

Comments

  • Pietro Marchesi
    Pietro Marchesi over 1 year

    I currently have a piece of code which spawns multiple processes as follows:

    pool = Pool(processes=None)
    results = [pool.apply(f, args=(arg1, arg2, arg3)) for arg3 in arg_list]
    

    My idea was that this would divide the work across cores, using all cores available since processes=None. However, the documentation for the Pool.apply() method in the multiprocessing module docs reads:

    Equivalent of the apply() built-in function. It blocks until the result is ready, so apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

    First question: I don't clearly understand this. How does apply distribute the work across workers, and in what way is it different from what apply_async does? If the tasks get distributed across workers, how is it possible that func is only executed in one of the workers?

    My guess: my guess would be that the apply, in my current implementation, is giving a task to a worker with a certain set of arguments, then waiting for that worker to be done, and then giving the next set of arguments to another worker. In this way I am sending work to different processes, yet no parallelism is taking place. This seems to be the case since apply is in fact just:

    def apply(self, func, args=(), kwds={}):
        '''
        Equivalent of `func(*args, **kwds)`.
        Pool must be running.
        '''
        return self.apply_async(func, args, kwds).get()
    

    Second question: I would also like to understand better why, in the introduction of the docs, section 16.6.1.5. ('Using a pool of workers'), they say that even a construction with apply_async such as [pool.apply_async(os.getpid, ()) for i in range(4)] may use more processes, but it isn't sure that it will. What decides whether multiple processes will be used?

  • Sean.H
    Sean.H over 4 years
    why apply_async().get is little faster than apply()? [pool.apply(os.getpid, ()) for i in range(4)] uses the same process number.
  • user3673
    user3673 over 2 years
    It should say: "launching multiple evaluations asynchronously may use more than one of the pool's processes"