Concurrent.futures vs Multiprocessing in Python 3

83,875

Solution 1

I wouldn't call concurrent.futures more "advanced" - it's a simpler interface that works very much the same regardless of whether you use multiple threads or multiple processes as the underlying parallelization gimmick.

So, like virtually all instances of "simpler interface", much the same trade-offs are involved: it has a shallower learning curve, in large part just because there's so much less available to be learned; but, because it offers fewer options, it may eventually frustrate you in ways the richer interfaces won't.

So far as CPU-bound tasks go, that's way too under-specified to say much meaningful. For CPU-bound tasks under CPython, you need multiple processes rather than multiple threads to have any chance of getting a speedup. But how much (if any) of a speedup you get depends on the details of your hardware, your OS, and especially on how much inter-process communication your specific tasks require. Under the covers, all inter-process parallelization gimmicks rely on the same OS primitives - the high-level API you use to get at those isn't a primary factor in bottom-line speed.

Edit: example

Here's the final code shown in the article you referenced, but I'm adding an import statement needed to make it work:

from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
    # Let the executor divide the work among processes by using 'map'.
    with ProcessPoolExecutor(max_workers=nprocs) as executor:
        return {num:factors for num, factors in
                                zip(nums,
                                    executor.map(factorize_naive, nums))}

Here's exactly the same thing using multiprocessing instead:

import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
    with mp.Pool(nprocs) as pool:
        return {num:factors for num, factors in
                                zip(nums,
                                    pool.map(factorize_naive, nums))}

Note that the ability to use multiprocessing.Pool objects as context managers was added in Python 3.3.

As for which one is easier to work with, they're essentially identical.

One difference is that Pool supports so many different ways of doing things that you may not realize how easy it can be until you've climbed quite a way up the learning curve.

Again, all those different ways are both a strength and a weakness. They're a strength because the flexibility may be required in some situations. They're a weakness because of "preferably only one obvious way to do it". A project sticking exclusively (if possible) to concurrent.futures will probably be easier to maintain over the long run, due to the lack of gratuitous novelty in how its minimal API can be used.

Solution 2

Probably for most of the time when you need parallel processing, you will find that either the ProcessPoolExecutor class from the concurrent.futures module or the Pool class from the multiprocessing module will provide equivalent facilities and it boils down to a matter of personal preference. But each does offer some facilities that make certain processing more convenient. I thought I would just point out a couple:

When submitting a batch of tasks, you sometimes want to be get the task results (i.e. return values) as soon as they become available. Both facilities provide for notification that a result from a submitted task is available via callback mechanisms:

Using multiprocessing.Pool:

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def process_result(return_value):
    print(return_value)

def main():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(worker_process, args=(i,), callback=process_result)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

The same can be done, albeit awkwardly, using a callback with concurrent.futures:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def process_result(future):
    print(future.result())

def main():
    executor = concurrent.futures.ProcessPoolExecutor()
    futures = [executor.submit(worker_process, i) for i in range(10)]
    for future in futures:
        future.add_done_callback(process_result)
    executor.shutdown()

if __name__ == '__main__':
    main()

Here each task is individually submitted for which a Future instance is returned. Then the callback must be added to the Future. Finally, when the callback is invoked, the argument passed is the Future instance for the task that has been completed and method result must be called to get the actual return value. But with the concurrent.futures module, there is actually no need to use a callback at all. You can use the as_completed method:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(worker_process, i) for i in range(10)]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

And it is easy to tie the return value back to the original passed argument to worker_process by using a dictionary to hold the Future instances:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(worker_process, i): i for i in range(10)}
        for future in concurrent.futures.as_completed(futures):
            i = futures[future] # retrieve the value that was squared
            print(i, future.result())

if __name__ == '__main__':
    main()

multiprocessing.Pool has methods imap and imap_unordered, the latter which allows task results to be returned in arbitrary order, but not necessarily in completion order. These methods are considered to be a lazier version of map. With method map, if the passed iterable argument does not have a __len__ attribute, it will first be converted to a list and its length will be used to compute an effective chunksize value if None was supplied as the chunksize argument. Therefore, you cannot achieve any storage optimizations by using a generator or generator expression as the iterable. But with methods imap and imap_unordered, the iterable can be a generator or generator expression; it will be iterated as necessary to produce new tasks for submission. But this necessitates that the default chunksize parameter be 1 since the length of the iterable in general cannot be known. But that doesn't stop you from providing a reasonable value using the same algorithm that the multiprocessing.Pool class uses if you have a good approximation to the length of the iterable (or the exact size as in the example below):

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def compute_chunksize(pool_size, iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, pool_size * 4)
    if extra:
        chunksize += 1
    return chunksize

def main():
    cpu_count = mp.cpu_count()
    N = 100
    chunksize = compute_chunksize(cpu_count, N)
    with mp.Pool() as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunksize):
            print(result)

if __name__ == '__main__':
    main()

But with imap_unordered there is no way to easily tie a result with a submitted job unless the worker process returned the original call arguments along with the return value. On the other hand the ability to specify a chunksize with imap_unordered and imap, for which the results will be in a predictable order, should make these methods more efficient than invoking the apply_async method repeatedly, which is essentially equivalent to using a chunksize of 1. But if you do need to process results in completion order, then to be sure you should use method apply_async with a callback function. It does, however, appear based on experimentation that if you use a chunksize value of 1 with imap_unordered, the results will be returned in completion order.

The map method of the ProcessPoolExecutor class from the concurrent.futures package is similar in one regard to the Pool.imap method from the multiprocessing package. This method will not convert its passed iterable arguments that are generator expressions to lists in order to compute effective chunksize values and that is why the chunksize argument defaults to 1 and why, if you are passing large iterables, you should consider specifying an appropriate chunksize value. However, unlike with Pool.imap, it appears from my expereince that you cannot begin to iterate results until all the iterables being passed to map have been iterated.

The multiprocessing.Pool class has a method apply that submits a task to the pool and blocks until the result is ready. The return value is just the return value from the worker function passed to the apply function. For example:

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def main():
    with mp.Pool() as pool:
        print(pool.apply(worker_process, args=(6,)))
        print(pool.apply(worker_process, args=(4,)))

if __name__ == '__main__':
    main()

The concurrent.futures.ProcessPoolExecutor class has no such equivalent. You have to issue a submit and then a call to result against the returned Future instance. It's not a hardship to have to do this, but the Pool.apply method is more convenient for the use case where a blocking task submission is appropriate. Such a case is when you have processing that calls for threading because most of the work being done in the threads is heavily I/O except for perhaps one function that is very CPU bound. The main program that creates the threads first creates a multiprocessing.Pool instance and passes it as an argument to all the threads. When the threads need to call the heavily CPU-bound function, it now runs the function using the Pool.apply method thereby running the code in another process and freeing the current process to allow the other threads to run.

A big deal has been made of the concurrent.futures module having two classes, ProcessPoolExecutor and ThreadPoolExecutor with identical interfaces. That is a nice feature. But the multiprocessing module also has an undocumented ThreadPool class with an identical interface as Pool:

>>> from multiprocessing.pool import Pool
>>> from multiprocessing.pool import ThreadPool
>>> dir(Pool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>> dir(ThreadPool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>>

You can submit tasks with either ProcessPoolExecutor.submit, which returns a Future instance, or Pool.apply_async, which returns an AsyncResult instance, and specify a timeout value for retrieving the result:

from concurrent.futures import ProcessPoolExecutor, TimeoutError
from time import sleep


def worker_1():
    while True:
        print('hanging')
        sleep(1)


def main():
    with ProcessPoolExecutor(1) as pool:
        future = pool.submit(worker_1)
        try:
            future.result(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')

if __name__ == '__main__':
    main()
    print("return from main()")

Prints:

hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.

The main process when calling future.result(3) will get a TimeoutError exception after 3 seconds because the submitted task has not completed within that time period. But the task is continuing to run, tying up the process and the with ProcessPoolExecutor(1) as pool: block never exits and thus the program does not terminate.

from multiprocessing import Pool, TimeoutError
from time import sleep


def worker_1():
    while True:
        print('hanging')
        sleep(1)

def main():
    with Pool(1) as pool:
        result = pool.apply_async(worker_1, args=())
        try:
            result.get(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')


if __name__ == '__main__':
    main()
    print("return from main()")

Prints:

hanging
hanging
hanging
timeout
return from main()

This time, however, even though the timed-out task is still continuing to run and is tying up the process, the with block is not prevented from exiting and thus the program terminates normally. The reason for this is that the context manager for the Pool instance will execute a call to terminate when the block exits and this results in the immediate termination of all processes in the pool. This is contrasted with the context handler for the ProcessPoolExecutor instance, which executes a call to shutdown(wait=True) to await the termination of all processes in the pool when the block it governs exits. The advantage would seem to go to multiprocessing.Pool if you are using context handlers to handle pool termination and the possibility of a timeout exists. Update: In Python 3.9, a new argument, cancel_futures, has been added to the shutdown method. Consequently, you can terminate all running tasks and any tasks waiting to run if you explicitly call shutdown(wait=False, cancel_futures=True) instead of relying on the default behavior resulting from the implicit call to shutdown when using a context handler.

But since the context handler for multiprocessing.Pool only calls terminate and not close followed by join, you must then ensure that all the jobs you have submitted have completed before exiting the with block, for example by submitting jobs with a blocking, synchronous call such as map or calling get on the AsyncResult object returned by a call to apply_async or iterating the results of the call to imap or by calling close followed by join on the pool instance.

Although there is no way to exit until timed-out tasks complete when using the ProcessPoolExecutor, you can cancel the starting of submitted tasks that are not already running. In the following demo we have a pool of size 1 so that jobs can only run consecutively. We submit 3 jobs one after another where the first two jobs take 3 seconds to run because of calls to time.sleep(3). We immediately try to cancel the first two jobs. The first attempt of canceling fails because the first job is already running. But because the pool only has one process, the second job must wait 3 seconds for the the first job to complete before it can start running and therefore the cancel succeeds. Finally, job 3 will begin and end almost immediately after job 1 completes, which will be approximately 3 seconds after we started the job submissions:

from concurrent.futures import ProcessPoolExecutor
import time

def worker1(i):
    time.sleep(3)
    print('Done', i)

def worker2():
    print('Hello')

def main():
    with ProcessPoolExecutor(max_workers=1) as executor:
        t = time.time()
        future1 = executor.submit(worker1, 1)
        future2 = executor.submit(worker1, 2)
        future3 = executor.submit(worker2)
        # this will fail since this task is already running:
        print(future1.cancel())
        # this will succeed since this task hasn't started (it's waiting for future1 to complete):
        print(future2.cancel())
        future3.result() # wait for completion
        print(time.time() - t)

if __name__ == '__main__':
    main()

Prints:

False
True
Done 1
Hello
3.1249606609344482

Solution 3

In addition to other answers' detailed list of differences, I've personally run into a unfixed (as-of 2020-10-27) indefinite hang that can happen with multiprocess.Pool when one of the workers crashes in certain ways. (In my case, an exception from a cython extension, though others say this can happen when a worker gets a SIGTERM, etc.) According to the documentation for ProcessPoolExecutor, it has been robust to this since python 3.3.

Solution 4

In my experience, I faced a lot of issues with the multiprocessing module as compared to concurrent.futures.(But this was on Windows os)

Two of main differences i could see were:

  1. Frequent Hangs in the multiprocessing module
  2. Concurrent.futures has got a relatively simpler way of execution. Meaning fetching the results, tracking of child processes etc.is very simple.

Example: (Fetching the result)

with concurrent.futures.ProcessPoolExecutor() as executor:
    f1 = executor.submit(some_function, parameter_to_be_passed) 
    print(f1.result())

So if you returning any value from some_function() you can directly catch/store it using f1.result(). The very same thing will need additional steps in the "multiprocessing" module.

If you are running on Linux systems then the hangs might not occur but the execution complexity is still more in the "multiprocessing" module.

Also having said this, it is also important to note my tasks were highly CPU intensive tasks.

On a personal note, I would recommend concurrent.futures.

Solution 5

I love concurrent.futures, mainly because the iterator of multiple function parameters: multiprocessing is somehow hacky when it comes to obtain multiple arguments to a function (there is no istarmap()-equivalent of starmap()):

import multiprocessing as mp

def power_plus_one(x, y):
    return (x**y) + 1

def wrapper(t):
    return power_plus_one(*t)

with mp.Pool() as pool:
    r = list(pool.imap(wrapper, [(0, 1), (2, 2)]))

print(r)

I find imap()/imap_unordered() super helpful for progress bars like tqdm or time estimations for larger computation. In concurrents.futures, this is super handy:

def power_plus_one(x, y):
    return (x**y) + 1

o = dict() # dict for output

with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = {executor.submit(power_plus_one, x, y): (x, y) for x, y in [(0, 1), (2, 2)]}
    for future in concurrent.futures.as_completed(futures):
        i = futures[future]
        o[i] = future.result()
print(o)

I also love the handy result mapping as a dict. :)

With tqdm you can easily:

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
    ...
Share:
83,875

Related videos on Youtube

GIS-Jonathan
Author by

GIS-Jonathan

Updated on May 08, 2022

Comments

  • GIS-Jonathan
    GIS-Jonathan about 2 years

    Python 3.2 introduced Concurrent Futures, which appear to be some advanced combination of the older threading and multiprocessing modules.

    What are the advantages and disadvantages of using this for CPU bound tasks over the older multiprocessing module?

    This article suggests they're much easier to work with - is that the case?

  • jfs
    jfs over 10 years
    "you need multiple processes rather than multiple threads to have any chance of getting a speedup" is too harsh. If speed is important; the code might already use a C library and therefore it can release GIL e.g., regex, lxml, numpy.
  • Tim Peters
    Tim Peters over 10 years
    @J.F.Sebastian, thanks for adding that - perhaps I should have said "under pure CPython", but I'm afraid there's no short way to explain the truth here without discussing the GIL.
  • kotrfa
    kotrfa about 8 years
    And it worth mentioning that threads might be especially useful and enough when having operation with long IO.
  • max
    max about 7 years
    @TimPeters In some ways ProcessPoolExecutor actually has more options than Pool because ProcessPoolExecutor.submit returns Future instances that allow cancellation (cancel), checking which exception was raised (exception), and dynamically adding a callback to be called upon completion (add_done_callback). None of these features are available with AsyncResult instances returned by Pool.apply_async. In other ways Pool has more options due to initializer / initargs, maxtasksperchild, and context in Pool.__init__, and more methods exposed by Pool instance.
  • Tim Peters
    Tim Peters about 7 years
    @max, sure, but note that the question wasn't about Pool, it was about the modules. Pool is a small part of what's in multiprocessing, and is so far down in the docs it takes a while for people to realize it even exists in multiprocessing. This particular answer focused on Pool because that's all the article the OP linked to used, and that cf is "much easier to work with" simply isn't true about what the article discussed. Beyond that, cf's as_completed() can also be very handy.
  • max
    max about 7 years
    @TimPeters oh yes completely agree
  • ntg
    ntg about 6 years
    "without discussing the GIL": GIL is interesting :) Would it be possible to point to a link with the problems GIL runs into for threads? I seem to remember something but not much....
  • Lith
    Lith almost 3 years
    This is a fantastic answer.
  • Booboo
    Booboo almost 3 years
    Frequent hangs? That's a fairly unspecific statement. Could it be your code? Also multiprocessing.pool requires no "additional" steps: async_result = pool.submit(some_function, args=(parameter1, parameter2, ...)); print(async_result.get())