Parallel foreach with asynchronous lambda

175,881

Solution 1

If you just want simple parallelism, you can do this:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

If you need something more complex, check out Stephen Toub's ForEachAsync post.

Solution 2

You can use the ParallelForEachAsync extension method from AsyncEnumerator NuGet Package:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.

Solution 3

One of the new .NET 6 APIs is Parallel.ForEachAsync, a way to schedule asynchronous work that allows you to control the degree of parallelism:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

Another example in Scott Hanselman's blog.

The source, for reference.

Solution 4

With SemaphoreSlim you can achieve parallelism control.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;

Solution 5

Simplest possible extension method compiled from other answers and the article referenced by the accepted asnwer:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

UPDATE: here's a simple modification that also supports a cancellation token like requested in the comments (untested)

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}
Share:
175,881

Related videos on Youtube

clausndk
Author by

clausndk

CTO of Sekoia, who creates an open platform for primarily elderly in need of care. We specialize in integration to the world surrounding care need people, that being content, communication, home appliance or healthcare related products. The purpose of the platform is to allow for longer time in own home, help care-givers helping the elderly and provide tools for management for more precise and higher quality reports.

Updated on April 25, 2022

Comments

  • clausndk
    clausndk about 2 years

    I would like to handle a collection in parallel, but I'm having trouble implementing it and I'm therefore hoping for some help.

    The trouble arises if I want to call a method marked async in C#, within the lambda of the parallel loop. For example:

    var bag = new ConcurrentBag<object>();
    Parallel.ForEach(myCollection, async item =>
    {
      // some pre stuff
      var response = await GetData(item);
      bag.Add(response);
      // some post stuff
    }
    var count = bag.Count;
    

    The problem occurs with the count being 0, because all the threads created are effectively just background threads and the Parallel.ForEach call doesn't wait for completion. If I remove the async keyword, the method looks like this:

    var bag = new ConcurrentBag<object>();
    Parallel.ForEach(myCollection, item =>
    {
      // some pre stuff
      var responseTask = await GetData(item);
      responseTask.Wait();
      var response = responseTask.Result;
      bag.Add(response);
      // some post stuff
    }
    var count = bag.Count;
    

    It works, but it completely disables the await cleverness and I have to do some manual exception handling.. (Removed for brevity).

    How can I implement a Parallel.ForEach loop, that uses the await keyword within the lambda? Is it possible?

    The prototype of the Parallel.ForEach method takes an Action<T> as parameter, but I want it to wait for my asynchronous lambda.

    • Josh M.
      Josh M. over 6 years
      I assume you meant to remove await from await GetData(item) in your second code block as it would produce a compilation error as-is.
    • Vitaliy Ulantikov
      Vitaliy Ulantikov over 6 years
      Possible duplicate of Nesting await in Parallel.ForEach
    • Theodor Zoulias
      Theodor Zoulias over 3 years
      As a side note, the ConcurrentBag<T> is a very specialized collection. A ConcurrentQueue<T> would serve you better in this case.
  • usr
    usr about 11 years
    Probably a throttling mechanism is needed. This will immediately create as many tasks as there are items which might end up in 10k network requests and such.
  • svick
    svick about 11 years
    @usr The last example in Stephen Toub's article addresses that.
  • Piotr Kula
    Piotr Kula about 7 years
    This is your package? I have seen you post this in a few places now? :D Oh wait.. your name is on the package :D +1
  • Serge Semenov
    Serge Semenov about 7 years
    @ppumkin, yes, it's mine. I've seen this problem over and over again, so decided to solve it in simplest way possible and free others from struggling as well :)
  • Piotr Kula
    Piotr Kula about 7 years
    Thanks.. it definitely makes sense and helped me out big time!
  • Luke Puplett
    Luke Puplett almost 7 years
    @svick I was puzzling over that last sample. It looks to me that it just batches a load of tasks to create more tasks to me, but they all get started en-mass.
  • svick
    svick almost 7 years
    @LukePuplett It creates dop tasks and each of them then processes some subset of the input collection in series.
  • Akira Yamamoto
    Akira Yamamoto almost 7 years
  • Shiran Dror
    Shiran Dror over 6 years
    you have a typo: maxDegreeOfParallelism > maxDegreeOfParalellism
  • Serge Semenov
    Serge Semenov over 6 years
    @ShiranDror, I'm pretty sure that it's spelled correctly
  • BornToCode
    BornToCode over 6 years
    The correct spelling is indeed maxDegreeOfParallelism, however there's something in @ShiranDror's comment - in your package you called the variable maxDegreeOfParalellism by mistake (and therefore your quoted code won't compile until you change it..)
  • AfshinZavvar
    AfshinZavvar almost 6 years
    @stephen I was testing your code by passing a list of numbers(1-10) and writing each number into console. All printed numbers were in sequence, however when I changed await GetData to Task.Run(async () => await GetData (x)) the printed numbers were shuffled. Any idea why it didn't run parallel at the first try?
  • Stephen Cleary
    Stephen Cleary almost 6 years
    @Afshin_Zavvar: If you call Task.Run without awaiting the result, then that's just throwing fire-and-forget work onto the thread pool. That is almost always a mistake.
  • nicolas2008
    nicolas2008 over 5 years
    'using' will not help. foreach loop will be waiting for semaphone indefinitely. Just try this simple code that reproduces the issue: await Enumerable.Range(1, 4).ForEachAsyncConcurrent(async (i) => { Console.WriteLine(i); throw new Exception("test exception"); }, maxDegreeOfParallelism: 2);
  • askids
    askids over 5 years
    @nicolay.anykienko you are right about #2. That memory problem can be solved by adding tasksWithThrottler.RemoveAll(x => x.IsCompleted);
  • StuartQ
    StuartQ over 4 years
    I've re edited the code above so it actually compiles, even if the spelling is wrong from an English point of view.
  • Serge Semenov
    Serge Semenov over 4 years
    @StuartQ, you probably use an older version of the library. The typo in the spelling has been fixed so I changed the example back.
  • StuartQ
    StuartQ over 4 years
    @SergeSemenov In that case I think you might want to update the link in this answer, since it points at V1.10. Since you're active on this question, I will leave that to you.
  • Massimo Savazzi
    Massimo Savazzi over 4 years
    I've tried it in my code and if I maxDegreeOfParallelism is not null the code deadlocks. Here you can see all the code to reproduce: stackoverflow.com/questions/58793118/…
  • Zafar
    Zafar almost 4 years
    Work's perfect. I used this in sending emails from AWS lambda, speed increased 10x
  • Sal
    Sal over 3 years
    SemaphoreSlim should be wrapped with a using statement because it implements IDisposable
  • Bjorn De Rijcke
    Bjorn De Rijcke over 3 years
    A simple throttling mechanism for this approach is to split your list into small lists of N entries, and perform this task select + Task.WhenAll for each smaller batch. This way you don't spawn thousands of tasks for large data sets.
  • Hocas
    Hocas over 3 years
    tcs.SetResult(null) need replace to tcs.TrySetResult(null)
  • nicolas2008
    nicolas2008 over 3 years
    @Hocas, why do you think TrySetResult is needed?
  • Hocas
    Hocas over 3 years
    I had a problem with multiple call SetResult when last time I used this code) When to use SetResult() vs TrySetResult()
  • nicolas2008
    nicolas2008 over 3 years
    @Hocas, that's interesting. tcs.SetResult(null) is not expected to be executed twice.
  • Theodor Zoulias
    Theodor Zoulias over 3 years
    Using the CurrentCount property of the SemaphoreSlim for controlling the execution flow is not a good idea. In most cases it creates race conditions. Using the Volatile.Read is also shaky (another possible race condition). I wouldn't trust this solution in a production environment.
  • nicolas2008
    nicolas2008 over 3 years
    @Theodor Zoulias, thanks for your feedback. It would be more constructive if you provide evidence or at least links to official documentation proving your doubts.
  • Theodor Zoulias
    Theodor Zoulias over 3 years
    Sure. I would like to be able to point to the documentation, or some other source of reliable information. But I can't find any. The only thing I know is that I am not able to prove the correctness of your solution by simply studying it. It may be correct, or may not be, and I am not confident that I would reach to a definite conclusion by experimentation. And I don't see the point of even trying, because I already know plenty of mechanisms that achieve the same thing correctly, reliably and efficiently, so why bother? I am sorry for not being able to provide a more satisfying answer. :-)
  • nicolas2008
    nicolas2008 over 3 years
    @Theodor Zoulias, I would be glad to see your solution how to achieve the same thing in easier way. Promise to remove my answer once you provide better one :-)
  • Theodor Zoulias
    Theodor Zoulias over 3 years
    Nicolay my preferred solution to this problem for production code would be to use an ActionBlock<T> from the TPL Dataflow library. Easy, efficient, rock solid, lots of configuration options, natively available in .NET Core, what else could I ask for? If this was not an option for some reason, there are some provably correct implementations here (including two of my own).
  • nicolas2008
    nicolas2008 over 3 years
    @Theodor Zoulias, ActionBlock<T> missing lazy source enumeration feature.
  • nicolas2008
    nicolas2008 over 3 years
    @Theodor Zoulias, I also checked your solutions. One with worker tasks looks theoretically correct but IMO it's overcomplicated and not very efficient due to complex task hierarchy and usage of lock.
  • Theodor Zoulias
    Theodor Zoulias over 3 years
    You can set the BoundedCapacity of the ActionBlock equal to the MaxDegreeOfParallelism, which creates a situation known as "backpreasure" in producer-consumer systems, and is quite similar with the familiar LINQ concept of lazy evaluation. The looping code that feeds the block with await block.SendAsync(item) is forced to wait until an empty slot becomes available in the internal buffer of the block, so the enumeration of the source becomes lazy by necessity.
  • Caleb Holt
    Caleb Holt about 3 years
    My concern with this approach when I looked at implementing it for my use, was the 1.7 million rows I was processing would result in each having a job in the tasksWithThrottler List, and that didn't seem ideal or really scalable. Posting the solution my teammate and I came up with using ActionBlock as a separate solution.
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    You should probably replace the semaphore.Wait() with await semaphore.WaitAsync(), to avoid blocking the caller. Also be aware that the functionality of the SemaphoreSlim in your solution can be replaced by the BoundedCapacity configuration of the ActionBlock, in combination with the awaitable SendAsync method. Comparatively it is more efficient (memory-wise).
  • Caleb Holt
    Caleb Holt about 3 years
    @TheodorZoulias Thanks so much for the feedback! It's something I'm actively working on for a project so I'll look at those changes and update my solution.
  • Alexei Levenkov
    Alexei Levenkov about 3 years
    stackoverflow.com/a/65251949/477420 answer by @TheodorZoulias shows very similar approach... presumably SendAsync does not wait for operation to finish (which is not clear to me from the docs)
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    Caleb Holt another gotcha that you may want to be aware of is that enumerating the user-supplied enumerable could potentially result to an exception, and in this case your implementation will just propagate immediately this exception, without awaiting the completion of the ActionBlock. This is not an optimal behavior, because it may leave tasks running in the background unobserved (in fire-and-forget fashion). Implementing correctly a ForEachAsync method can be quite tricky. I became aware of the gotcha myself very recently.
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    @AlexeiLevenkov the documentation of the SendAsync method is quite confusing. I doubt that a person smart enough has ever existed on this planet, that could understand what this method does by just reading the docs. One should delve deep into the source code and understand that both Post and SendAsync methods are based on the hidden (explicitly implemented) OfferMessage API, that has 5 possible return values. The SendAsync handles asynchronously the Postponed return value.
  • Tim
    Tim about 3 years
    Also, this line "await throttler.WaitAsync();" should NOT be within the try block because if it throws an exception you would be calling Release when you haven't acquired the lock.
  • Caleb Holt
    Caleb Holt about 3 years
    @TheodorZoulias So, try around the loop and put complete/completion calls in a finally, then allow the exception to indicate it wasn't run to completion? I considered catching ALL exceptions and returning in aggregate, but depending on the cause that might result in an enormous number of exceptions. I suppose I could go down a rabbit hole with some options class and the let the caller decide if exceptions should be aggregated or terminated on first occurrence. That's the most flexible for a generic approach. (I've also wired through cancelationToken at this point.)
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    Caleb ideally you should watch out for exceptions coming from the GetAsyncEnumerator, MoveNextAsync and DisposeAsync methods, and if anyone of these fails, propagate that exception to the ActionBlock by invoking its Fault method. That's a lot of work to do though, so you could take a shortcut and just wrap the loop in a try/catch, risking that a bug in the method's implementation would also be surfaced as a normal operational error. That's what I've done in this implementation.
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    "You can use async lambda as well" <== could you explain this more? Or even better could you give an example of using an async lambda with a Parallel.ForEach loop, that does not result to buggy behavior?
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    Gravity I am sorry that I have to downvote your answer, but passing an async delegate to the Parallel.ForEach method is more than "not a best practice". It is deeply and irrecoverably flawed. The Parallel.ForEach does not understand async delegates, so the lambda is async void. It's not fire-and-forget, it's fire-and-crash. In such a case the Parallel.ForEach will not wait for the completion of the launched operations, it will not enforce a maximum degree of parallelism, and it will not propagate exceptions. Any exception will be unhandled and will crash the process.
  • Gravity API
    Gravity API about 3 years
    If you follow the example here, yes, but not if you implement it in a way to cover all the
  • Gravity API
    Gravity API about 3 years
    I don't think down vote is justified in this case. This is only an answer to a specific question, NOT the recommended answer (the recommended answer is the one I have showed in the first example). Using async inside Parallel.ForEach can work and will not crash if handled, but again IT IS NOT the recommended approach and it is not related the original answer, which works flawlessly.
  • Gravity API
    Gravity API about 3 years
    Please run the following code, var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }; var items = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; Task.Run(() => Parallel.ForEach(items, options, async item => await Task.Run(() => Console.WriteLine($"{item}")))); Thread.Sleep(15000); it is an async Parallel.ForEach, that works. It does not crash - but, naturally enforce the max-parallel in a different way - which also can be handled, but you get the point :)
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    Gravity this is a bad example. Parallelizing the Console.WriteLine method makes no sense, because this method is synchronized. Only one thread can write to the Console at a time. Also notice the ugliness of the Thread.Sleep(15000);. You added this line because otherwise the program would end before the completion of the async void operations launched uncontrollably by the misused Parallel.ForEach loop. This is not the correct way to write software.
  • Gravity API
    Gravity API about 3 years
    This is just an example - THIS IS NOT THE ANSWER for the original question, please read my original answer about it which says if you use async parallel you MUST have process to hold it - and that I DO NOT recommend it - THIS IS AN EXAMPLE of bad approach :). You can parallel inside what ever you want - console write line it is just and example. Please read the first answer - BEFORE the update - THIS IS my answer and this solve the parallel issue asked in the first place.
  • Gravity API
    Gravity API about 3 years
    I have commented out the bad practice example to make the answer more clear
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    Well, you can't expect good votes by presenting bad examples and indirectly promoting bad practices, whether you recommend them or not. How about removing all the bad stuff from your answer, and keeping the good stuff?
  • Gravity API
    Gravity API about 3 years
    Agreed :) That is why I removed it
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    Gravity the phrase "You can use async lambda as well", connected with the Parallel.ForEach method, is an indisputable downvote by me. No amount of warnings before or after, or deletion indications like strikethrough, can make the presence of this phrase tolerable. I am talking exclusively about my own voting criteria. Anyone else can vote however they see fit.
  • Gravity API
    Gravity API about 3 years
    Accepted and make sense. Didn't think about it that way and I agree with your criteria and change my post accordingly.
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    I would upvote it, but I don't like the arbitrary int maxDegreeOfParallelism = 10. Better let the user specify the level of concurrency explicitly. You could also consider adding a .ConfigureAwait(false) after the asyncAction(item) (but not after the throttler.WaitAsync()).
  • Alex from Jitbit
    Alex from Jitbit about 3 years
    @TheodorZoulias both good points, edited. Also working on an variant that supports a cancellationToken will post it later.
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    It is worth noting that most solutions based on Task.WhenAll are only suitable for a relatively small number of tasks, or for asyncActions that are guaranteed not to throw. Otherwise it can be very frustrating to wait half an hour for the completion of 10,000 tasks, and just receive an exception as a result (that could have been thrown by the very first task).
  • satnhak
    satnhak about 3 years
    This seems to be the current state of the art review: medium.com/@alex.puiu/parallel-foreach-async-in-c-36756f8ebe‌​62
  • Michael Freidgeim
    Michael Freidgeim over 2 years
    Ohad Schneider Answer in Nesting await in Parallel.ForEach adds error handling to Stephen Toub’s ForEachAsync extension
  • Theodor Zoulias
    Theodor Zoulias over 2 years
    There is already an answer mentioning and demonstrating the Parallel.ForEachAsync API, by Majid Shahabfar.
  • Jurion
    Jurion over 2 years
    Hello. Is there a way to simulate "break" ? Use case : There is a 30 minut limit on eecution. After 25 minutes I would like to stop execution and requeue remaining items for a new task.
  • zmechanic
    zmechanic over 2 years
    This is NOT a robust solution for 2 reasons. First, if exception thrown it will not terminate the loop. Second, throttler is not disposed.
  • Alex from Jitbit
    Alex from Jitbit over 2 years
    @zmechanic I think it's up to the developer whether or not to abort the loop on exception.
  • zmechanic
    zmechanic over 2 years
    @Alex from Jitbit agree, but it's not stated in your answer, and LINQ behavior (as you do it) is different to foreach in this aspect. In LINQ, the exception will not terminate enumeration.
  • Seabizkit
    Seabizkit over 2 years
    could you show how to add cancelationToken... @AlexfromJitbit I did ` foreach (var item in enumerable) { ct.ThrowIfCancellationRequested();` but feel like it could maybe also be passed to the Task.Run and when 0 maxDegreeOfParallelism it should also be factored.... could you assist in this regard
  • Seabizkit
    Seabizkit over 2 years
    Please add cancellation token code... aswell plz
  • Raymond
    Raymond about 2 years
    @tim assuming your comment is resolved, can you remove it please (and/or @ me if I forget to remove this after you have!)?
  • Ganso Doido
    Ganso Doido about 2 years
    @Tim Thank you Tim. I made the necessary changes.
  • Theodor Zoulias
    Theodor Zoulias about 2 years
    @zmechanic regarding the throttler not being disposed, I agree that it should (by adding a using before the var throttler), but it's not a deal breaker: Do I need to Dispose a SemaphoreSlim.
  • Theodor Zoulias
    Theodor Zoulias about 2 years
    The if (cancellationToken.IsCancellationRequested) return; is not needed. Theoretically it could result in the Task completing successfully instead of being canceled, without doing all the work.
  • Seabizkit
    Seabizkit almost 2 years
    @nicolas2008 what do you mean it will wait indefinitely, it that were true it would never exist the forloop?, of which it only enters if it has more to iterate over? so it defo gets out? which means 'using' will help as it exits the method no?
  • Michael Freidgeim
    Michael Freidgeim almost 2 years