How can I wrap a synchronous function in an async coroutine?
Solution 1
Eventually I found an answer in this thread. The method I was looking for is run_in_executor. This allows a synchronous function to be run asynchronously without blocking an event loop.
In the sleep
example I posted above, it might look like this:
import asyncio
from time import sleep
async def sleep_async(loop, delay):
# None uses the default executor (ThreadPoolExecutor)
await loop.run_in_executor(None, sleep, delay)
return 'I slept asynchronously'
Also see the following answer -> How do we call a normal function where a coroutine is expected?
Solution 2
You can use a decorator to wrap the sync version to an async version.
import time
from functools import wraps, partial
def wrap(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
@wrap
def sleep_async(delay):
time.sleep(delay)
return 'I slept asynchronously'
or use the aioify library
% pip install aioify
then
@aioify
def sleep_async(delay):
pass
Solution 3
The decorator would be useful for this case and run your blocking function in another thread.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union
class to_async:
def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
self.executor = executor
def __call__(self, blocking):
@wraps(blocking)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
if not self.executor:
self.executor = ThreadPoolExecutor()
func = partial(blocking, *args, **kwargs)
return await loop.run_in_executor(self.executor,func)
return wrapper
@to_async(executor=None)
def sync(*args, **kwargs):
print(args, kwargs)
asyncio.run(sync("hello", "world", result=True))
Solution 4
Maybe someone will need my solution to this problem. I wrote my own library to solve this, which allows you to make any function asynchronous using a decorator.
To install the library, run this command:
$ pip install awaits
To make any of your functions asynchronous, just add the @awaitable decorator to it, like this:
import time
import asyncio
from awaits.awaitable import awaitable
@awaitable
def sum(a, b):
# heavy load simulation
time.sleep(10)
return a + b
Now you can make sure that your function is really asynchronous coroutine:
print(asyncio.run(sum(2, 2)))
"Under the hood" your function will be executed in the thread pool. This thread pool will not be recreated every time your function is called. A thread pool is created once and accepts new tasks via a queue. This will make your program run faster than using other solutions, because the creation of additional threads is an additional overhead.
Zac Delventhal
Full stack developer about town. Advocate for open source software, FP, blockchains, JavaScript, and Rust. Will grudgingly write Python upon request.
Updated on May 16, 2021Comments
-
Zac Delventhal almost 3 years
I'm using aiohttp to build an API server that sends TCP requests off to a seperate server. The module that sends the TCP requests is synchronous and a black box for my purposes. So my problem is that these requests are blocking the entire API. I need a way to wrap the module requests in an asynchronous coroutine that won't block the rest of the API.
So, just using
sleep
as a simple example, is there any way to somehow wrap time-consuming synchronous code in a non-blocking coroutine, something like this:async def sleep_async(delay): # After calling sleep, loop should be released until sleep is done yield sleep(delay) return 'I slept asynchronously'
-
Oleg almost 7 years
ProcessPoolExecutor
has a high cost because it launches an entire new python interpreter. It is used when you have a CPU-intensive task that needs to use multiple processors. Consider usingThreadPoolExecutor
instead, which uses threading. -
Zac Delventhal almost 7 yearsThank you for the additional info. Although the original example used process pool,
ThreadPoolExecutor
is what I ended up using after a little more research. Still seems a little jenky, but so far it's all holding together. -
Amit Kotlovski over 6 yearsJust a note, instead of creating a new executor, it might be simpler to use the default executor by calling
loop.run_in_executor(executor=None, func, *args)
(see documentation). -
WBAR about 5 yearsgood advise to use
aioify
it makes now so easy to write async functions and modules :) -
Admin about 4 yearsTo get the event loop, one can do
loop = asyncio.get_event_loop()
-
CMCDragonkai over 3 yearsEven when using
aiofy
when using the function, it may still block the event loop if the function itself is a long-running blocking operation. In that case, instead of concurrency we would need parallelism. Unless there was a way to yield back to the loop within the long-running operation. -
Sima about 3 yearsUnion[None, ...] - union of None and something is Optional[...]
-
Sima about 3 yearsnot Optional[None,ThreadPoolExecutor], just Optional[ThreadPoolExecutor] =)