How to limit the amount of concurrent async I/O operations?

64,656

Solution 1

You can definitely do this in the latest versions of async for .NET, using .NET 4.5 Beta. The previous post from 'usr' points to a good article written by Stephen Toub, but the less announced news is that the async semaphore actually made it into the Beta release of .NET 4.5

If you look at our beloved SemaphoreSlim class (which you should be using since it's more performant than the original Semaphore), it now boasts the WaitAsync(...) series of overloads, with all of the expected arguments - timeout intervals, cancellation tokens, all of your usual scheduling friends :)

Stephen's also written a more recent blog post about the new .NET 4.5 goodies that came out with beta see What’s New for Parallelism in .NET 4.5 Beta.

Last, here's some sample code about how to use SemaphoreSlim for async method throttling:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Last, but probably a worthy mention is a solution that uses TPL-based scheduling. You can create delegate-bound tasks on the TPL that have not yet been started, and allow for a custom task scheduler to limit the concurrency. In fact, there's an MSDN sample for it here:

See also TaskScheduler .

Solution 2

If you have an IEnumerable (ie. strings of URL s) and you want to do an I/O bound operation with each of these (ie. make an async http request) concurrently AND optionally you also want to set the maximum number of concurrent I/O requests in real time, here is how you can do that. This way you do not use thread pool et al, the method uses semaphoreslim to control max concurrent I/O requests similar to a sliding window pattern one request completes, leaves the semaphore and the next one gets in.

usage:

await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }

Solution 3

There are a lot of pitfalls and direct use of a semaphore can be tricky in error cases, so I would suggest to use AsyncEnumerator NuGet Package instead of re-inventing the wheel:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);

Solution 4

Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in.

Look at the AsyncSemaphore class built by the most respectable Stephen Toub. What you want is called a semaphore, and you need an async version of it.

Solution 5

After the release of the .NET 6 (in November, 2021), the recommended way of limiting the amount of concurrent asynchronous I/O operations is the Parallel.ForEachAsync API, with the MaxDegreeOfParallelism configuration. Here is how it can be used in practice:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", /*...*/ };
var client = new HttpClient();
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };

// now let's send HTTP requests to each of these URLs in parallel
await Parallel.ForEachAsync(urls, options, async (url, cancellationToken) =>
{
    var html = await client.GetStringAsync(url, cancellationToken);
});

In the above example the Parallel.ForEachAsync task is awaited asynchronously. You can also Wait it synchronously if you need to, which will block the current thread until the completion of all asynchronous operations. The synchronous Wait has the advantage that in case of errors, all exceptions will be propagated. On the contrary the await operator propagates by design only the first exception. In case this is a problem, you can find solutions here.

(Note: an idiomatic implementation of a ForEachAsync extension method that also propagates the results, can be found in the 4th revision of this answer)

Share:
64,656
Grief Coder
Author by

Grief Coder

Web and Cloud Engineer, Architect Enjoying working with Windows Azure, ASP.NET vNext, Node.js, React...

Updated on February 16, 2022

Comments

  • Grief Coder
    Grief Coder about 2 years
    // let's say there is a list of 1000+ URLs
    string[] urls = { "http://google.com", "http://yahoo.com", ... };
    
    // now let's send HTTP requests to each of these URLs in parallel
    urls.AsParallel().ForAll(async (url) => {
        var client = new HttpClient();
        var html = await client.GetStringAsync(url);
    });
    

    Here is the problem, it starts 1000+ simultaneous web requests. Is there an easy way to limit the concurrent amount of these async http requests? So that no more than 20 web pages are downloaded at any given time. How to do it in the most efficient manner?

    • svick
      svick almost 12 years
      How is this different from your previous question?
    • Chris Disley
      Chris Disley almost 12 years
      stackoverflow.com/questions/9290498/… With a ParallelOptions parameter.
    • spender
      spender almost 12 years
      @ChrisDisley, this will only parallelize the launching of the requests.
    • eglasius
      eglasius about 10 years
      @svick is right, how is it different? btw, I love the answer there stackoverflow.com/a/10802883/66372
    • Shimmy Weitzhandler
      Shimmy Weitzhandler over 8 years
      Besides HttpClient is IDisposable, and you should dispose it, especially when you're going to use 1000+ of them. HttpClient can be used as a singleton for multiple requests.
    • avs099
      avs099 almost 6 years
      @Shimmy you should never dispose HttpClient: stackoverflow.com/a/15708633/1246870
    • morgwai
      morgwai over 2 years
      as google gives this as the first result for similar problem in java, see this: stackoverflow.com/a/69234939/1220560
  • Grief Coder
    Grief Coder almost 12 years
    Yep, but that doesn't relate to async I/O operations. The code above will fire up 1000+ simultaneous downloads even if it is running on a single thread.
  • scottm
    scottm almost 12 years
    Didn't see the await keyword in there. Removing that should solve the problem, correct?
  • Grief Coder
    Grief Coder almost 12 years
    Good point! Though each task here will contain async and sync code (page downloaded asynchronously then processed in sync manner). I am trying to distribute the sync portion of the code accross CPUs and at the same time limit the amount of concurrent async I/O operations.
  • svick
    svick almost 12 years
    The library certainly can handle more tasks running (with the Running status) concurrently than the amount of cores. This will be especially the case with a I/O bound Tasks.
  • spender
    spender almost 12 years
    Why? Because launching 1000+ http requests simultaneously might not be a task well suited to the user's network capacity.
  • Grief Coder
    Grief Coder almost 12 years
    @svick: yep. Do you know how to efficiently control the max concurrent TPL tasks (not threads)?
  • Sean U
    Sean U almost 12 years
    Parallel extensions can also be used as a way to multiplex I/O operations without having to manually implement a pure async solution. Which I agree could be considered sloppy, but as long as you keep a tight limit on the number of concurrent operations it probably won't strain the threadpool too much.
  • spender
    spender almost 12 years
    Don't run long running/blocking operations in the ThreadPool. @SeanU Your suggestion is bad practice and can cause many unintended and nasty side-effects.
  • usr
    usr almost 12 years
    I don't think this answer is providing an answer. Being purely async is not enough here: We really want to throttle the physical IOs in a non-blocking manner.
  • Sean U
    Sean U almost 12 years
    @spender Aside from consuming the entire pool if you don't keep a limit on how many threads you consume, what other unintended or nasty side-effects are there I should be worried about?
  • spender
    spender almost 12 years
    Well, in ideal circumstances, the "entire pool" should really only represent the # processors in the system. Anything larger represents a strained ThreadPool. Because the ThreadPool is reluctant to spin up extra threads and will only do so under sustained stress, other operations that rely on a fluid ThreadPool will now be affected by this implicit latency. For instance: System.Threading.Timer fires its callbacks on the ThreadPool. Now, with ony a few long-lived tasks in the ThreadPool, they're not coming in on time.
  • Sean U
    Sean U almost 12 years
    How bad can that really get? For example, is it going to introduce worse delays than a collection of generation 2? I ask because I learned the idiom from Microsoft sample code on how to use TPL, which would seem to imply that it's not the worst practice in the world.
  • spender
    spender almost 12 years
  • Sean U
    Sean U almost 12 years
    Thread pool starvation is rather more extreme than what you originally describe. And was addressed in my original comment.
  • Theo Yaung
    Theo Yaung almost 12 years
    Note that "Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in." is no longer correct as of .NET 4.5 Beta. SemaphoreSlim now offers WaitAsync(...) functionality :)
  • spender
    spender almost 12 years
    Hmm.. not sure I agree... when working on a large project, if one too many developers takes this view, you'll get starvation even though each developer's contribution in isolation is not enough to tip things over the edge. Given that there is only one ThreadPool, even if you're treating it semi-respectfully... if everyone else is doing the same, trouble can follow. As such I always advise against running long stuff in the ThreadPool.
  • GreyCloud
    GreyCloud about 11 years
    Doesn't this code end up creating a list containing as many task objects as there are urls? is there anyway to avoid this?
  • GreyCloud
    GreyCloud about 11 years
    isn't a parallel.foreach with a limited degree of parallelism a nicer approach? msdn.microsoft.com/en-us/library/…
  • Todd Menier
    Todd Menier about 11 years
    Should SemaphoreSlim (with its new async methods) be preferred over AsyncSemphore, or does Toub's implementation still have some advantage?
  • usr
    usr about 11 years
    In my opinion, the built-in type should be preferred because it is likely to be well-tested and well-designed.
  • jdasilva
    jdasilva almost 11 years
    Stephen added a comment in response to a question on his blog post confirming that using SemaphoreSlim for .NET 4.5 would generally be the way to go.
  • GameScripting
    GameScripting over 10 years
    Please note, that WaitAsync will implicitly increase the internal counter. I've ran into an issue when not staring a task for every, but some of the elements in the source collection. Make sure you only call WaitAsync when you're scheduling a task.
  • Neal Ehardt
    Neal Ehardt about 9 years
    I don't think this works. GetStringAsync(url) is meant to be called with await. If you inspect the type of var html, it is a Task<string>, not the result string.
  • Shimmy Weitzhandler
    Shimmy Weitzhandler over 8 years
    Why don't you dispose you HttpClient
  • Theo Yaung
    Theo Yaung almost 8 years
    @NealEhardt is correct. Parallel.ForEach(...) is intended for running blocks of synchronous code in parallel (e.g. on different threads).
  • Josh Noe
    Josh Noe almost 8 years
    @GreyCloud: Parallel.ForEach works with synchronous code. This allows you to call asynchronous code.
  • TheMonarch
    TheMonarch over 6 years
    @Shimmy, although HttpClient technically inherits from IDisposable, it's not actually doing anything. There is actually no benefit to disposing HttpClient whatsoever.
  • Shimmy Weitzhandler
    Shimmy Weitzhandler over 6 years
    @TheMonarch you're wrong. Besides it's always a good habit to wrap all IDisposables in using or try-finally statements, and assure their disposal.
  • Rupert Rawnsley
    Rupert Rawnsley over 6 years
    Given how popular this answer is, it's worth pointing out that HttpClient can and should be a single common instance rather than an instance per request.
  • AgentFire
    AgentFire over 6 years
  • Dogu Arslan
    Dogu Arslan over 6 years
    no you should not need to explicitly dispose SemaphoreSlim in this implementation and usage as it is used internally inside the method and the method does not access its AvailableWaitHandle property in which case we would have needed to either dispose or wrap it within a using block.
  • AgentFire
    AgentFire over 6 years
    Just thinking of the best practices and lessons we teach other people. A using would be nice.
  • Jay Shah
    Jay Shah almost 6 years
    I think you are just specifying initialCount for SemaphoreSlim and you need to specify 2nd parameter i.e. maxCount in the constructor of SemaphoreSlim.
  • avs099
    avs099 almost 6 years
    @RupertRawnsley +1, and of course there is a proof for that on our beloved SO: stackoverflow.com/a/15708633/1246870
  • Dinerdo
    Dinerdo over 5 years
    What is the benefit of using Task.Run here? I know it's usually used to not block the UI thread, but in here it's hard for me to understand the difference between adding this without Task.Run since, when running this, it seems to be doing the same type of thing.
  • Slothario
    Slothario almost 5 years
    @Dinerdo I was wondering the same thing. As far as I understand, you should only use Task.Run when you have a CPU-intensive task. It seems like you should simply await these calls, so I think you're right. However I'd like someone to triple check my logic.
  • Nick
    Nick over 4 years
    Task.Run() is necessary here because if you await normally then the requests will be processed one at a time (since it's waiting for the request to finish before continuing the rest of the loop) instead of in parallel. However, if you don't await the request then you will release the semaphore as soon as the task is scheduled (allowing all requests to run at the same time), which defeats the purpose of using it in the first place. The context created by Task.Run is just a place to hold onto the semaphore resource.
  • Bouke
    Bouke over 4 years
    Task.Run() is needed here because the code doesn't create all Tasks at once, but rather schedules 20 active tasks at once. After the loop has completed, there can be 0 to 20 tasks still active (created / running / waiting to run).
  • venkat
    venkat over 4 years
    I want each response from each task processed into a List. How can I get return Result or response
  • Theodor Zoulias
    Theodor Zoulias about 4 years
    @Dinerdo there is hardly any benefit by using Task.Run here, but there is hardly any harm using it either (because the Task.Run method understands async delegates). The alternative would be to use a local function that accepts a url and returns a Task, but local functions were not available at the time this answer was written (C# 7 was released at March 2017).
  • Seabizkit
    Seabizkit about 4 years
    well this example i can follow, but trying work out what is the best way to do this, basically have a throttler but my Func would return a list, which i ultimately want in a final list of all completed when done...which may require locked on list, do you have suggestions.
  • Dogu Arslan
    Dogu Arslan about 4 years
    you can slightly update the method so it returns the list of actual tasks and you await Task.WhenAll from inside your calling code. Once Task.WhenAll is complete, you can enumerate over each task in the list and add its list to the final list. Change method signature to 'public static IEnumerable<Task<TOut>> ForEachAsync<TIn, TOut>( IEnumerable<TIn> inputEnumerable, Func<TIn, Task<TOut>> asyncProcessor, int? maxDegreeOfParallelism = null)'
  • Chris DaMour
    Chris DaMour almost 4 years
    would the task.run be necessary if it was an async lambda in urls.select()?
  • CajunCoding
    CajunCoding about 3 years
    As noted in prior posts you should not be creating new HttpClients in any kind of loop unless you actually enjoy socket exhaustion issues in production.
  • aruno
    aruno about 3 years
    Is there still nothing built into the framework that does this?
  • aruno
    aruno about 3 years
    Did you ever make a SelectAsyncConcurrent version of this?
  • Jay Shah
    Jay Shah about 3 years
    @Simon_Weaver I don't think framework has any built-in mechanism for this as of now.
  • Jay Shah
    Jay Shah about 3 years
    @Simon_Weaver No, I have not built SelectAsyncConcurrent version, but that would be an interesting implementation.
  • aruno
    aruno about 3 years
    I just made a very clumsy one that simply calls ForEachAsyncConcurrent. I only needed it in one place so it was fine. I just created a ConcurrentStack and added items to it inside a call to your function. The ordering wasn't important for me, but if anyone else attempts it don't use a List because a) it's not thread safe and b) the results may not come back in the same order anyway.
  • Theodor Zoulias
    Theodor Zoulias about 2 years
    A Parallel.ForEachAsync-based implementation that returns a Task<TResult[]> can be found here.
  • PJ7
    PJ7 about 2 years
    This is the correct answer now but it's at the bottom here. Yes this question is very old but it's the first Google search hit. Wish we could improve this.