How can I wrap a synchronous function in an async coroutine?

35,089

Solution 1

Eventually I found an answer in this thread. The method I was looking for is run_in_executor. This allows a synchronous function to be run asynchronously without blocking an event loop.

In the sleep example I posted above, it might look like this:

import asyncio
from time import sleep

async def sleep_async(loop, delay):
    # None uses the default executor (ThreadPoolExecutor)
    await loop.run_in_executor(None, sleep, delay)
    return 'I slept asynchronously'

Also see the following answer -> How do we call a normal function where a coroutine is expected?

Solution 2

You can use a decorator to wrap the sync version to an async version.

import time
from functools import wraps, partial


def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run

@wrap
def sleep_async(delay):
    time.sleep(delay)
    return 'I slept asynchronously'

or use the aioify library

% pip install aioify

then

@aioify
def sleep_async(delay):
    pass

Solution 3

The decorator would be useful for this case and run your blocking function in another thread.

import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union

class to_async:

    def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
       
        self.executor =  executor
    
    def __call__(self, blocking):
        @wraps(blocking)
        async def wrapper(*args, **kwargs):

            loop = asyncio.get_event_loop()
            if not self.executor:
                self.executor = ThreadPoolExecutor()

            func = partial(blocking, *args, **kwargs)
        
            return await loop.run_in_executor(self.executor,func)

        return wrapper

@to_async(executor=None)
def sync(*args, **kwargs):
    print(args, kwargs)
   
asyncio.run(sync("hello", "world", result=True))

Solution 4

Maybe someone will need my solution to this problem. I wrote my own library to solve this, which allows you to make any function asynchronous using a decorator.

To install the library, run this command:

$ pip install awaits

To make any of your functions asynchronous, just add the @awaitable decorator to it, like this:

import time
import asyncio
from awaits.awaitable import awaitable

@awaitable
def sum(a, b):
  # heavy load simulation
  time.sleep(10)
  return a + b

Now you can make sure that your function is really asynchronous coroutine:

print(asyncio.run(sum(2, 2)))

"Under the hood" your function will be executed in the thread pool. This thread pool will not be recreated every time your function is called. A thread pool is created once and accepts new tasks via a queue. This will make your program run faster than using other solutions, because the creation of additional threads is an additional overhead.

Share:
35,089
Zac Delventhal
Author by

Zac Delventhal

Full stack developer about town. Advocate for open source software, FP, blockchains, JavaScript, and Rust. Will grudgingly write Python upon request.

Updated on May 16, 2021

Comments

  • Zac Delventhal
    Zac Delventhal almost 3 years

    I'm using aiohttp to build an API server that sends TCP requests off to a seperate server. The module that sends the TCP requests is synchronous and a black box for my purposes. So my problem is that these requests are blocking the entire API. I need a way to wrap the module requests in an asynchronous coroutine that won't block the rest of the API.

    So, just using sleep as a simple example, is there any way to somehow wrap time-consuming synchronous code in a non-blocking coroutine, something like this:

    async def sleep_async(delay):
        # After calling sleep, loop should be released until sleep is done
        yield sleep(delay)
        return 'I slept asynchronously'
    
  • Oleg
    Oleg almost 7 years
    ProcessPoolExecutor has a high cost because it launches an entire new python interpreter. It is used when you have a CPU-intensive task that needs to use multiple processors. Consider using ThreadPoolExecutor instead, which uses threading.
  • Zac Delventhal
    Zac Delventhal almost 7 years
    Thank you for the additional info. Although the original example used process pool, ThreadPoolExecutor is what I ended up using after a little more research. Still seems a little jenky, but so far it's all holding together.
  • Amit Kotlovski
    Amit Kotlovski over 6 years
    Just a note, instead of creating a new executor, it might be simpler to use the default executor by calling loop.run_in_executor(executor=None, func, *args) (see documentation).
  • WBAR
    WBAR about 5 years
    good advise to use aioify it makes now so easy to write async functions and modules :)
  • Admin
    Admin about 4 years
    To get the event loop, one can do loop = asyncio.get_event_loop()
  • CMCDragonkai
    CMCDragonkai over 3 years
    Even when using aiofy when using the function, it may still block the event loop if the function itself is a long-running blocking operation. In that case, instead of concurrency we would need parallelism. Unless there was a way to yield back to the loop within the long-running operation.
  • Sima
    Sima about 3 years
    Union[None, ...] - union of None and something is Optional[...]
  • Sima
    Sima about 3 years
    not Optional[None,ThreadPoolExecutor], just Optional[ThreadPoolExecutor] =)