How to limit concurrency with Python asyncio?

43,318

Solution 1

Before reading the rest of this answer, please note that the idiomatic way of limiting the number of parallel tasks this with asyncio is using asyncio.Semaphore, as shown in Mikhail's answer and elegantly abstracted in Andrei's answer. This answer contains working, but a bit more complicated ways of achieving the same. I am leaving the answer because in some cases this approach can have advantages over a semaphore, specifically when the work to be done is very large or unbounded, and you cannot create all the coroutines in advance. In that case the second (queue-based) solution is this answer is what you want. But in most regular situations, such as parallel download through aiohttp, you should use a semaphore instead.


You basically need a fixed-size pool of download tasks. asyncio doesn't come with a pre-made task pool, but it is easy to create one: simply keep a set of tasks and don't allow it to grow past the limit. Although the question states your reluctance to go down that route, the code ends up much more elegant:

import asyncio, random

async def download(code):
    wait_time = random.randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

async def main(loop):
    no_concurrent = 3
    dltasks = set()
    i = 0
    while i < 9:
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(
                dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(loop.create_task(download(i)))
        i += 1
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

An alternative is to create a fixed number of coroutines doing the downloading, much like a fixed-size thread pool, and feed them work using an asyncio.Queue. This removes the need to manually limit the number of downloads, which will be automatically limited by the number of coroutines invoking download():

# download() defined as above

async def download_worker(q):
    while True:
        code = await q.get()
        await download(code)
        q.task_done()

async def main(loop):
    q = asyncio.Queue()
    workers = [loop.create_task(download_worker(q)) for _ in range(3)]
    i = 0
    while i < 9:
        await q.put(i)
        i += 1
    await q.join()  # wait for all tasks to be processed
    for worker in workers:
        worker.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

As for your other question, the obvious choice would be aiohttp.

Solution 2

If I'm not mistaken you're searching for asyncio.Semaphore. Example of usage:

import asyncio
from random import randint


async def download(code):
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))


sem = asyncio.Semaphore(3)


async def safe_download(i):
    async with sem:  # semaphore limits num of simultaneous downloads
        return await download(i)


async def main():
    tasks = [
        asyncio.ensure_future(safe_download(i))  # creating task starts coroutine
        for i
        in range(9)
    ]
    await asyncio.gather(*tasks)  # await moment all downloads done


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

Output:

downloading 0 will take 3 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 3 second(s)
downloaded 1
downloaded 0
downloading 4 will take 2 second(s)
downloading 5 will take 1 second(s)
downloaded 5
downloaded 3
downloading 6 will take 3 second(s)
downloading 7 will take 1 second(s)
downloaded 4
downloading 8 will take 2 second(s)
downloaded 7
downloaded 8
downloaded 6

An example of async downloading with aiohttp can be found here. Note that aiohttp has a Semaphore equivalent built in, which you can see an example of here. It has a default limit of 100 connections.

Solution 3

I used Mikhails answer and ended up with this little gem

async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)

    async def sem_task(task):
        async with semaphore:
            return await task
    return await asyncio.gather(*(sem_task(task) for task in tasks))

Which you would run instead of normal gather

await gather_with_concurrency(100, *tasks)

Solution 4

The asyncio-pool library does exactly what you need.

https://pypi.org/project/asyncio-pool/


LIST_OF_URLS = ("http://www.google.com", "......")

pool = AioPool(size=3)
await pool.map(your_download_coroutine, LIST_OF_URLS)

Solution 5

Using semaphore, you can also create a decorator to wrap the function

import asyncio
from functools import wraps
def request_concurrency_limit_decorator(limit=3):
    # Bind the default event loop 
    sem = asyncio.Semaphore(limit)

    def executor(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            async with sem:
                return await func(*args, **kwargs)

        return wrapper

    return executor

Then, add the decorator to the origin download function.

@request_concurrency_limit_decorator(limit=...)
async def download(...):
    ...

Now you can call the download function like before, but with Semaphore to limit the concurrency.

await download(...)

It should be noted that when the decorator function is executed, the created Semaphore is bound to the default event loop, so you cannot call asyncio.run to create a new loop. Instead, call asyncio.get_event_loop().run... to use the default event loop.

asyncio.Semaphore RuntimeError: Task got Future attached to a different loop

Share:
43,318

Related videos on Youtube

Shridharshan
Author by

Shridharshan

Excited in problem solving. Strong in Database Design, Data Structures and Algorithms. Strong understanding of JavaScript and Python. Tech enthusiast and an avid follower of cutting edge tech trends. Experienced in dealing with large scale systems in production at Amazon, which have taught me that software is much more than just code: think high quality logs, metrics, automated monitoring, change management, oncall runbooks and more.

Updated on July 08, 2022

Comments

  • Shridharshan
    Shridharshan almost 2 years

    Let's assume we have a bunch of links to download and each of the link may take a different amount of time to download. And I'm allowed to download using utmost 3 connections only. Now, I want to ensure that I do this efficiently using asyncio.

    Here's what I'm trying to achieve: At any point in time, try to ensure that I have atleast 3 downloads running.

    Connection 1: 1---------7---9---
    Connection 2: 2---4----6-----
    Connection 3: 3-----5---8-----
    

    The numbers represent the download links, while hyphens represent Waiting for download.

    Here is the code that I'm using right now

    from random import randint
    import asyncio
    
    count = 0
    
    
    async def download(code, permit_download, no_concurrent, downloading_event):
        global count
        downloading_event.set()
        wait_time = randint(1, 3)
        print('downloading {} will take {} second(s)'.format(code, wait_time))
        await asyncio.sleep(wait_time)  # I/O, context will switch to main function
        print('downloaded {}'.format(code))
        count -= 1
        if count < no_concurrent and not permit_download.is_set():
            permit_download.set()
    
    
    async def main(loop):
        global count
        permit_download = asyncio.Event()
        permit_download.set()
        downloading_event = asyncio.Event()
        no_concurrent = 3
        i = 0
        while i < 9:
            if permit_download.is_set():
                count += 1
                if count >= no_concurrent:
                    permit_download.clear()
                loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
                await downloading_event.wait()  # To force context to switch to download function
                downloading_event.clear()
                i += 1
            else:
                await permit_download.wait()
        await asyncio.sleep(9)
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(main(loop))
        finally:
            loop.close()
    

    And the output is as expected:

    downloading 0 will take 2 second(s)
    downloading 1 will take 3 second(s)
    downloading 2 will take 1 second(s)
    downloaded 2
    downloading 3 will take 2 second(s)
    downloaded 0
    downloading 4 will take 3 second(s)
    downloaded 1
    downloaded 3
    downloading 5 will take 2 second(s)
    downloading 6 will take 2 second(s)
    downloaded 5
    downloaded 6
    downloaded 4
    downloading 7 will take 1 second(s)
    downloading 8 will take 1 second(s)
    downloaded 7
    downloaded 8
    

    But here are my questions:

    1. At the moment, I'm simply waiting for 9 seconds to keep the main function running till the downloads are complete. Is there an efficient way of waiting for the last download to complete before exiting the main function? (I know there's asyncio.wait, but I'll need to store all the task references for it to work)

    2. What's a good library that does this kind of task? I know javascript has a lot of async libraries, but what about Python?

    Edit: 2. What's a good library that takes care of common async patterns? (Something like async)

  • Shridharshan
    Shridharshan over 6 years
    Is there a good Python async library to deal with common async programming patterns? Like the famous async package for JavaScript.
  • Shridharshan
    Shridharshan over 6 years
    The first approach works very well and I need not create and store all the task references in advance (I use a generator to lazily load the download links). I did not know asyncio.wait had a "return_when" parameter.
  • Mikhail Gerasimov
    Mikhail Gerasimov over 6 years
    @Shridharshan from my experience asyncio itself contains all you usually need. Take a look at synchronization primitives and at module's functions in general.
  • user4815162342
    user4815162342 over 6 years
    @Shridharshan In the second solution you only create the three coroutines for downloading in advance, the actual download links can also be generated lazily. But it's a matter of taste - I think I would also prefer the first solution in practice.
  • user4815162342
    user4815162342 almost 6 years
    @OrangeDog That is actually intentional, because the OP's code was using manual while loops. The idea was to adapt their existing code (preserving the non-conventional idiom) to the desired semantics.
  • radzak
    radzak about 5 years
    @MikhailGerasimov calling asyncio.ensure_future() is redundant as async.gather() calls it internally anyway (source). However then calling the variable tasks would be "wrong", because these are not tasks yet.
  • Krissh
    Krissh about 4 years
    The Sempahore is deprecated since version 3.8 and will be removed in version 3.10. official warning reads. Instead they are asking to use loop. But how to use It can anyone provide any example.
  • user4815162342
    user4815162342 about 4 years
    @Krissh Since you don't provide code or the exact error message, it's hard to tell what you're referring to, but rest assured that asyncio.Semaphore is not deprecated. What is deprecated and will be removed is the loop parameter to its constructor, which you can omit and everything will work just fine. (This is not specific to semaphores, the loop parameter is being removed across the board.)
  • Krissh
    Krissh about 4 years
    @user4815162342 sorry my bad. you're right. I got confused in doc.
  • user4815162342
    user4815162342 almost 4 years
    @18augst Would you discuss the edit in a comment? The changes you proposed should not be necessary.
  • user4815162342
    user4815162342 almost 4 years
    This is a nice utility function, +1.
  • help-ukraine-now
    help-ukraine-now over 3 years
    Does asyncio.Semaphore(3) mean you end up with 3 requests per second? Or is it something different?
  • Mikhail Gerasimov
    Mikhail Gerasimov over 3 years
    @politicalscientist it means that not more than 3 requests can be active simultaneously at any given point of time.
  • freethebees
    freethebees over 3 years
    Seeing a function within a function, my mind immediately went to decorators. I had a little play and you can implement this with decorators, either with a fixed semaphore value or dynamic; however, the solution here offers far more flexibility.
  • Dmitry Arkhipenko
    Dmitry Arkhipenko over 3 years
    Such a wonderful clear and short example!
  • AhmetK
    AhmetK almost 3 years
    second approach seems faster in theory but fails in practice. First approach beats second one by 2-3 times faster.
  • user4815162342
    user4815162342 almost 3 years
    @AhmetK I find such a difference very unlikely, and probably a result of a flaw in the implementation. It's hard to tell without access to the code used to benchmark both cases.
  • AhmetK
    AhmetK almost 3 years
    for me it was async request i used httpx and first one was making 10 request at the same time but second one seems like not doing it.
  • user4815162342
    user4815162342 almost 3 years
    @AhmetK You should accompany such claims with code (perhaps posting a separate question). It is most likely that your second code has a problem that prevented it from running in parallel.
  • OriolJ
    OriolJ over 2 years
    for me to work I had to modify "return await task" for "return await asyncio.create_task(task)" and pass a list of coroutines as tasks.
  • Kulasangar
    Kulasangar over 2 years
    @Andrei what could be the Semaphore number that I can give for processing 30k http requests for a min? Is there any hard and fast rule?
  • mark12345
    mark12345 over 2 years
    Wow this is what I'm looking for. Thank you very much for this. Is this the latest method? Because I've been using like tasks.append(task) asyncio.gather(*tasks) I just knew the asyncio.wait
  • bobby
    bobby over 2 years
    The tasks parameter of gather_with_concurrency is a bit misleading, it implies that you can use the function with several Tasks created with asyncio.create_task. However in that case it doesn't work, as create_task is actually executing the coroutine right away in the event loop. As gather_with_concurrency is expecting coroutines, the parameter should rather be named coros.
  • user4815162342
    user4815162342 over 2 years
    I like the brevity of this approach (at least compared to queue-based code). The downside is that it's a bit hard to follow - I had to read it several times to understand what it's doing and convince myself that it's not flawed.
  • songololo
    songololo about 2 years
    I really like this approach, simple and intuitive, easy to unpack done tasks etc. as well.
  • J.T
    J.T almost 2 years
    It would be helpful to see a version of this that works with tasks as well as coroutines.
  • Ognjen
    Ognjen almost 2 years
    I wouldn't say that using semaphores for this use case is the most idiomatic. We just happen to be hypnotized by the beauty of async with semaphore - but as long as you're in control of the loop that schedules tasks, introducing a shared state and creating at once all tasks that all wait on it is actually wasteful. I find the first part of this answer the most effective ( yet with a preference to AioPool for its brevity ). The queue is nice, but it's also an unnecessary shared resource, in that specific case of being given an iterable of coroutines to execute.
  • user4815162342
    user4815162342 almost 2 years
    @vincent It's idiomatic in the sense of it being an idiom that is widely used, universally recognized, and frequently recommended. You can argue that it's not the optimal solution for all circumstances, but that's why there are different answers with different approaches.
  • Ognjen
    Ognjen almost 2 years
    no, it's not, it's even worse to state it that way - looking familiar doesn't imply it's correct. What I mean here, for that problem statement, recommending semaphores because somehow it's generally used to solve the more general problem of limiting access to a shared resource, is not a good advice
  • user4815162342
    user4815162342 almost 2 years
    @vincent We can agree to disagree about the use of the term idiomatic, I don't care to argue that point. As for whether the approach is correct, it depends on what you're doing. As long as the number of tasks is bounded, there should be no problem in creating them in advance. Asyncio tasks are lightweight, and being able to create many of them was one of the motivators for providing the library.