What kind of problems (if any) would there be combining asyncio with multiprocessing?

28,377

Solution 1

You should be able to safely combine asyncio and multiprocessing without too much trouble, though you shouldn't be using multiprocessing directly. The cardinal sin of asyncio (and any other event-loop based asynchronous framework) is blocking the event loop. If you try to use multiprocessing directly, any time you block to wait for a child process, you're going to block the event loop. Obviously, this is bad.

The simplest way to avoid this is to use BaseEventLoop.run_in_executor to execute a function in a concurrent.futures.ProcessPoolExecutor. ProcessPoolExecutor is a process pool implemented using multiprocessing.Process, but asyncio has built-in support for executing a function in it without blocking the event loop. Here's a simple example:

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Pretend this is expensive calculations
   return x * 5

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

For the majority of cases, this is function alone is good enough. If you find yourself needing other constructs from multiprocessing, like Queue, Event, Manager, etc., there is a third-party library called aioprocessing (full disclosure: I wrote it), that provides asyncio-compatible versions of all the multiprocessing data structures. Here's an example demoing that:

import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()

@asyncio.coroutine
def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
    p.start()
    while True:
        result = yield from queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
    yield from event.coro_wait()
    with (yield from lock):
        yield from queue.coro_put(78)
        yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [ 
        asyncio.async(example(queue, event, lock)),
        asyncio.async(example2(queue, event, lock)),
    ]   
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

Solution 2

Yes, there are quite a few bits that may (or may not) bite you.

  • When you run something like asyncio it expects to run on one thread or process. This does not (by itself) work with parallel processing. You somehow have to distribute the work while leaving the IO operations (specifically those on sockets) in a single thread/process.
  • While your idea to hand off individual connections to a different handler process is nice, it is hard to implement. The first obstacle is that you need a way to pull the connection out of asyncio without closing it. The next obstacle is that you cannot simply send a file descriptor to a different process unless you use platform-specific (probably Linux) code from a C-extension.
  • Note that the multiprocessing module is known to create a number of threads for communication. Most of the time when you use communication structures (such as Queues), a thread is spawned. Unfortunately those threads are not completely invisible. For instance they can fail to tear down cleanly (when you intend to terminate your program), but depending on their number the resource usage may be noticeable on its own.

If you really intend to handle individual connections in individual processes, I suggest to examine different approaches. For instance you can put a socket into listen mode and then simultaneously accept connections from multiple worker processes in parallel. Once a worker is finished processing a request, it can go accept the next connection, so you still use less resources than forking a process for each connection. Spamassassin and Apache (mpm prefork) can use this worker model for instance. It might end up easier and more robust depending on your use case. Specifically you can make your workers die after serving a configured number of requests and be respawned by a master process thereby eliminating much of the negative effects of memory leaks.

Solution 3

See PEP 3156, in particular the section on Thread interaction:

http://www.python.org/dev/peps/pep-3156/#thread-interaction

This documents clearly the new asyncio methods you might use, including run_in_executor(). Note that the Executor is defined in concurrent.futures, I suggest you also have a look there.

Share:
28,377
Wayne Werner
Author by

Wayne Werner

I'm a husband to my beautiful wife, father to our children, Computer Science graduate from University of Central Arkansas. I love my wife, our children, computers, playing guitar (especially singing/playing for my wife and/or kids), bicycling (including taking my kids for rides in my bike trailer), woodworking, airbrushing, digital and traditional artistry, playing games with my family (traditional and digital), my poor Chevette that I had to sell, throwing knives, firearms, knot tying, rope making, whip making, and really just learning new stuff in general. If I don't know about it I probably want to learn about it, if only so I can make informed decisions about it. My three favorite programming languages are Python, Lisp, and Assembly (though I'm not sure about the order of those last two languages...). I think the CANSPAM act is one of the dumbest pieces of legislation in the history of the universe I love Python and HTML+Javascript. I'm #SOreadytohelp (I really want a t-shirt, what can I say?)

Updated on July 05, 2022

Comments

  • Wayne Werner
    Wayne Werner almost 2 years

    As almost everyone is aware when they first look at threading in Python, there is the GIL that makes life miserable for people who actually want to do processing in parallel - or at least give it a chance.

    I am currently looking at implementing something like the Reactor pattern. Effectively I want to listen for incoming socket connections on one thread-like, and when someone tries to connect, accept that connection and pass it along to another thread-like for processing.

    I'm not (yet) sure what kind of load I might be facing. I know there is currently setup a 2MB cap on incoming messages. Theoretically we could get thousands per second (though I don't know if practically we've seen anything like that). The amount of time spent processing a message isn't terribly important, though obviously quicker would be better.

    I was looking into the Reactor pattern, and developed a small example using the multiprocessing library that (at least in testing) seems to work just fine. However, now/soon we'll have the asyncio library available, which would handle the event loop for me.

    Is there anything that could bite me by combining asyncio and multiprocessing?

  • Wayne Werner
    Wayne Werner over 10 years
    I suppose my question was a bit ambiguous - when I mentioned I would send it off to a thread-like, I actually meant that they were separate event loops.
  • jon
    jon over 6 years
    the io event loop is in main process, if I want to send/recv via socket in child process, how can I do that? I found I can't simply call main_proc_loop.ensure_future(send_socket_data...) because they are in different process? what is the best way to achieve it? via queue?
  • DtechNet
    DtechNet over 4 years
    dano is the package stable? Are there new features or bug fixes - is it still maintained? Github says July 2018 - just checking. I'm thinking it's alive and kicking?
  • dano
    dano over 4 years
    @DtechNet It should be stable. I don't intend to add any new features unless people start asking for them. If I get a bug report or PR, I intend to fix/merge them. There isn't a lot of activity on the project because there isn't much to do to maintain it. The multiprocessing API is pretty stable, so it generally just keeps working even as new versions of Python come along.