Python 3: How to submit an async function to a threadPool?
I recommend a careful readthrough of Python 3's asyncio development guide, particularly the "Concurrency and Multithreading" section.
The main conceptual issue in your example that event loops are single-threaded, so it doesn't make sense to execute an async coroutine in a thread pool. There are a few ways for event loops and threads to interact:
-
Event loop per thread. For example:
async def threadWorkAsync(obj): b = do_something() if b: # Run a and b as concurrent tasks task_a = asyncio.create_task(do_a()) task_b = asyncio.create_task(do_b()) await task_a await task_b def threadWork(obj): # Create run loop for this thread and block until completion asyncio.run(threadWorkAsync()) def startLoop(): while 1: arrayOfFutures = [] for item in clients: arrayOfFutures.append(config.threadPool.submit(threadWork, item)) wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
-
Execute blocking code in an executor. This allows you to use async futures instead of concurrent futures as above.
async def startLoop(): while 1: arrayOfFutures = [] for item in clients: arrayOfFutures.append(asyncio.run_in_executor( config.threadPool, threadWork, item)) await asyncio.gather(*arrayOfFutures)
-
Use threadsafe functions to submit tasks to event loops across threads. For example, instead of creating a run loop for each thread you could run all async coroutines in the main thread's run loop:
def threadWork(obj, loop): b = do_something() if b: future_a = asyncio.run_coroutine_threadsafe(do_a(), loop) future_b = asyncio.run_coroutine_threadsafe(do_b(), loop) concurrent.futures.wait([future_a, future_b]) async def startLoop(): loop = asyncio.get_running_loop() while 1: arrayOfFutures = [] for item in clients: arrayOfFutures.append(asyncio.run_in_executor( config.threadPool, threadWork, item, loop)) await asyncio.gather(*arrayOfFutures)
Note: This example should not be used literally as it will result in all coroutines executing in the main thread while the thread pool workers just block. This is just to show an example of the
run_coroutine_threadsafe()
method.
Related videos on Youtube
Comments
-
Manu Masson almost 2 years
I want to use both
ThreadPoolExecutor
fromconcurrent.futures
and async functions.My program repeatedly submits a function with different input values to a thread pool. The final sequence of tasks that are executed in that larger function can be in any order, and I don't care about the return value, just that they execute at some point in the future.
So I tried to do this
async def startLoop(): while 1: for item in clients: arrayOfFutures.append(await config.threadPool.submit(threadWork, obj)) wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
where the function submitted is:
async def threadWork(obj): bool = do_something() # needs to execute before next functions if bool: do_a() # can be executed at any time do_b() # ^
where
do_b
anddo_a
are async functions.The problem with this is that I get the error:TypeError: object Future can't be used in 'await' expression
and if I remove the await, I get another error saying I need to addawait
.I guess I could make everything use threads, but I don't really want to do that.
-
Manu Masson about 5 years@Sraw Not everything being executed is async, only a couple functions at the end. I don't want to spawn heaps of threads to handle this, so I thought I might be able to get the last two running async within the spawned thread.
-