Python 3: How to submit an async function to a threadPool?

10,809

I recommend a careful readthrough of Python 3's asyncio development guide, particularly the "Concurrency and Multithreading" section.

The main conceptual issue in your example that event loops are single-threaded, so it doesn't make sense to execute an async coroutine in a thread pool. There are a few ways for event loops and threads to interact:

  • Event loop per thread. For example:

     async def threadWorkAsync(obj):
         b = do_something()
         if b:
             # Run a and b as concurrent tasks
             task_a = asyncio.create_task(do_a())
             task_b = asyncio.create_task(do_b())
             await task_a
             await task_b
    
     def threadWork(obj):
         # Create run loop for this thread and block until completion
         asyncio.run(threadWorkAsync())
    
     def startLoop():
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(config.threadPool.submit(threadWork, item))
    
             wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
    
  • Execute blocking code in an executor. This allows you to use async futures instead of concurrent futures as above.

     async def startLoop():
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(asyncio.run_in_executor(
                     config.threadPool, threadWork, item))
    
             await asyncio.gather(*arrayOfFutures)
    
  • Use threadsafe functions to submit tasks to event loops across threads. For example, instead of creating a run loop for each thread you could run all async coroutines in the main thread's run loop:

     def threadWork(obj, loop):
         b = do_something()
         if b:
             future_a = asyncio.run_coroutine_threadsafe(do_a(), loop)
             future_b = asyncio.run_coroutine_threadsafe(do_b(), loop)
             concurrent.futures.wait([future_a, future_b])
    
     async def startLoop():
         loop = asyncio.get_running_loop()
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(asyncio.run_in_executor(
                     config.threadPool, threadWork, item, loop))
    
             await asyncio.gather(*arrayOfFutures)
    

    Note: This example should not be used literally as it will result in all coroutines executing in the main thread while the thread pool workers just block. This is just to show an example of the run_coroutine_threadsafe() method.

Share:
10,809

Related videos on Youtube

Manu Masson
Author by

Manu Masson

[email protected]

Updated on June 04, 2022

Comments

  • Manu Masson
    Manu Masson almost 2 years

    I want to use both ThreadPoolExecutor from concurrent.futures and async functions.

    My program repeatedly submits a function with different input values to a thread pool. The final sequence of tasks that are executed in that larger function can be in any order, and I don't care about the return value, just that they execute at some point in the future.

    So I tried to do this

    async def startLoop():
    
        while 1:
            for item in clients:
                arrayOfFutures.append(await config.threadPool.submit(threadWork, obj))
    
            wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
    

    where the function submitted is:

    async def threadWork(obj):
       bool = do_something() # needs to execute before next functions
       if bool:
           do_a() # can be executed at any time
           do_b() # ^
    

    where do_b and do_a are async functions.The problem with this is that I get the error: TypeError: object Future can't be used in 'await' expression and if I remove the await, I get another error saying I need to add await.

    I guess I could make everything use threads, but I don't really want to do that.

    • Manu Masson
      Manu Masson about 5 years
      @Sraw Not everything being executed is async, only a couple functions at the end. I don't want to spawn heaps of threads to handle this, so I thought I might be able to get the last two running async within the spawned thread.