Parallel foreach with asynchronous lambda
Solution 1
If you just want simple parallelism, you can do this:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
If you need something more complex, check out Stephen Toub's ForEachAsync
post.
Solution 2
You can use the ParallelForEachAsync
extension method from AsyncEnumerator NuGet Package:
using Dasync.Collections;
var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.
Solution 3
One of the new .NET 6 APIs is Parallel.ForEachAsync, a way to schedule asynchronous work that allows you to control the degree of parallelism:
var urls = new []
{
"https://dotnet.microsoft.com",
"https://www.microsoft.com",
"https://stackoverflow.com"
};
var client = new HttpClient();
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);
var response = await client.GetAsync(url);
if (response.IsSuccessStatusCode)
{
using var target = File.OpenWrite(targetPath);
await response.Content.CopyToAsync(target);
}
});
Another example in Scott Hanselman's blog.
The source, for reference.
Solution 4
With SemaphoreSlim
you can achieve parallelism control.
var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
await throttler.WaitAsync();
try
{
var response = await GetData(item);
bag.Add(response);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
var count = bag.Count;
Solution 5
Simplest possible extension method compiled from other answers and the article referenced by the accepted asnwer:
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync();
try
{
await asyncAction(item).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
UPDATE: here's a simple modification that also supports a cancellation token like requested in the comments (untested)
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync(cancellationToken);
if (cancellationToken.IsCancellationRequested) return;
try
{
await asyncAction(item, cancellationToken).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
Related videos on Youtube
clausndk
CTO of Sekoia, who creates an open platform for primarily elderly in need of care. We specialize in integration to the world surrounding care need people, that being content, communication, home appliance or healthcare related products. The purpose of the platform is to allow for longer time in own home, help care-givers helping the elderly and provide tools for management for more precise and higher quality reports.
Updated on April 25, 2022Comments
-
clausndk about 2 years
I would like to handle a collection in parallel, but I'm having trouble implementing it and I'm therefore hoping for some help.
The trouble arises if I want to call a method marked async in C#, within the lambda of the parallel loop. For example:
var bag = new ConcurrentBag<object>(); Parallel.ForEach(myCollection, async item => { // some pre stuff var response = await GetData(item); bag.Add(response); // some post stuff } var count = bag.Count;
The problem occurs with the count being 0, because all the threads created are effectively just background threads and the
Parallel.ForEach
call doesn't wait for completion. If I remove the async keyword, the method looks like this:var bag = new ConcurrentBag<object>(); Parallel.ForEach(myCollection, item => { // some pre stuff var responseTask = await GetData(item); responseTask.Wait(); var response = responseTask.Result; bag.Add(response); // some post stuff } var count = bag.Count;
It works, but it completely disables the await cleverness and I have to do some manual exception handling.. (Removed for brevity).
How can I implement a
Parallel.ForEach
loop, that uses the await keyword within the lambda? Is it possible?The prototype of the Parallel.ForEach method takes an
Action<T>
as parameter, but I want it to wait for my asynchronous lambda.-
Josh M. over 6 yearsI assume you meant to remove
await
fromawait GetData(item)
in your second code block as it would produce a compilation error as-is. -
Vitaliy Ulantikov over 6 yearsPossible duplicate of Nesting await in Parallel.ForEach
-
Theodor Zoulias over 3 yearsAs a side note, the
ConcurrentBag<T>
is a very specialized collection. AConcurrentQueue<T>
would serve you better in this case.
-
-
usr about 11 yearsProbably a throttling mechanism is needed. This will immediately create as many tasks as there are items which might end up in 10k network requests and such.
-
svick about 11 years@usr The last example in Stephen Toub's article addresses that.
-
Piotr Kula about 7 yearsThis is your package? I have seen you post this in a few places now? :D Oh wait.. your name is on the package :D +1
-
Serge Semenov about 7 years@ppumkin, yes, it's mine. I've seen this problem over and over again, so decided to solve it in simplest way possible and free others from struggling as well :)
-
Piotr Kula about 7 yearsThanks.. it definitely makes sense and helped me out big time!
-
Luke Puplett almost 7 years@svick I was puzzling over that last sample. It looks to me that it just batches a load of tasks to create more tasks to me, but they all get started en-mass.
-
svick almost 7 years@LukePuplett It creates
dop
tasks and each of them then processes some subset of the input collection in series. -
Akira Yamamoto almost 7 yearsThrottling: stackoverflow.com/questions/10806951/…
-
Shiran Dror over 6 yearsyou have a typo:
maxDegreeOfParallelism
>maxDegreeOfParalellism
-
Serge Semenov over 6 years@ShiranDror, I'm pretty sure that it's spelled correctly
-
BornToCode over 6 yearsThe correct spelling is indeed maxDegreeOfParallelism, however there's something in @ShiranDror's comment - in your package you called the variable maxDegreeOfParalellism by mistake (and therefore your quoted code won't compile until you change it..)
-
AfshinZavvar almost 6 years@stephen I was testing your code by passing a list of numbers(1-10) and writing each number into console. All printed numbers were in sequence, however when I changed await GetData to Task.Run(async () => await GetData (x)) the printed numbers were shuffled. Any idea why it didn't run parallel at the first try?
-
Stephen Cleary almost 6 years@Afshin_Zavvar: If you call
Task.Run
withoutawait
ing the result, then that's just throwing fire-and-forget work onto the thread pool. That is almost always a mistake. -
nicolas2008 over 5 years'using' will not help. foreach loop will be waiting for semaphone indefinitely. Just try this simple code that reproduces the issue: await Enumerable.Range(1, 4).ForEachAsyncConcurrent(async (i) => { Console.WriteLine(i); throw new Exception("test exception"); }, maxDegreeOfParallelism: 2);
-
askids over 5 years@nicolay.anykienko you are right about #2. That memory problem can be solved by adding tasksWithThrottler.RemoveAll(x => x.IsCompleted);
-
StuartQ over 4 yearsI've re edited the code above so it actually compiles, even if the spelling is wrong from an English point of view.
-
Serge Semenov over 4 years@StuartQ, you probably use an older version of the library. The typo in the spelling has been fixed so I changed the example back.
-
StuartQ over 4 years@SergeSemenov In that case I think you might want to update the link in this answer, since it points at V1.10. Since you're active on this question, I will leave that to you.
-
Massimo Savazzi over 4 yearsI've tried it in my code and if I maxDegreeOfParallelism is not null the code deadlocks. Here you can see all the code to reproduce: stackoverflow.com/questions/58793118/…
-
Zafar almost 4 yearsWork's perfect. I used this in sending emails from AWS lambda, speed increased 10x
-
Sal over 3 yearsSemaphoreSlim should be wrapped with a
using
statement because it implements IDisposable -
Bjorn De Rijcke over 3 yearsA simple throttling mechanism for this approach is to split your list into small lists of N entries, and perform this task select + Task.WhenAll for each smaller batch. This way you don't spawn thousands of tasks for large data sets.
-
Hocas over 3 years
tcs.SetResult(null)
need replace totcs.TrySetResult(null)
-
nicolas2008 over 3 years@Hocas, why do you think TrySetResult is needed?
-
Hocas over 3 yearsI had a problem with multiple call
SetResult
when last time I used this code) When to use SetResult() vs TrySetResult() -
nicolas2008 over 3 years@Hocas, that's interesting. tcs.SetResult(null) is not expected to be executed twice.
-
Theodor Zoulias over 3 yearsUsing the
CurrentCount
property of theSemaphoreSlim
for controlling the execution flow is not a good idea. In most cases it creates race conditions. Using theVolatile.Read
is also shaky (another possible race condition). I wouldn't trust this solution in a production environment. -
nicolas2008 over 3 years@Theodor Zoulias, thanks for your feedback. It would be more constructive if you provide evidence or at least links to official documentation proving your doubts.
-
Theodor Zoulias over 3 yearsSure. I would like to be able to point to the documentation, or some other source of reliable information. But I can't find any. The only thing I know is that I am not able to prove the correctness of your solution by simply studying it. It may be correct, or may not be, and I am not confident that I would reach to a definite conclusion by experimentation. And I don't see the point of even trying, because I already know plenty of mechanisms that achieve the same thing correctly, reliably and efficiently, so why bother? I am sorry for not being able to provide a more satisfying answer. :-)
-
nicolas2008 over 3 years@Theodor Zoulias, I would be glad to see your solution how to achieve the same thing in easier way. Promise to remove my answer once you provide better one :-)
-
Theodor Zoulias over 3 yearsNicolay my preferred solution to this problem for production code would be to use an
ActionBlock<T>
from the TPL Dataflow library. Easy, efficient, rock solid, lots of configuration options, natively available in .NET Core, what else could I ask for? If this was not an option for some reason, there are some provably correct implementations here (including two of my own). -
nicolas2008 over 3 years@Theodor Zoulias, ActionBlock<T> missing lazy source enumeration feature.
-
nicolas2008 over 3 years@Theodor Zoulias, I also checked your solutions. One with worker tasks looks theoretically correct but IMO it's overcomplicated and not very efficient due to complex task hierarchy and usage of lock.
-
Theodor Zoulias over 3 yearsYou can set the
BoundedCapacity
of theActionBlock
equal to theMaxDegreeOfParallelism
, which creates a situation known as "backpreasure" in producer-consumer systems, and is quite similar with the familiar LINQ concept of lazy evaluation. The looping code that feeds the block withawait block.SendAsync(item)
is forced to wait until an empty slot becomes available in the internal buffer of the block, so the enumeration of the source becomes lazy by necessity. -
Caleb Holt about 3 yearsMy concern with this approach when I looked at implementing it for my use, was the 1.7 million rows I was processing would result in each having a job in the tasksWithThrottler List, and that didn't seem ideal or really scalable. Posting the solution my teammate and I came up with using ActionBlock as a separate solution.
-
Theodor Zoulias about 3 yearsYou should probably replace the
semaphore.Wait()
withawait semaphore.WaitAsync()
, to avoid blocking the caller. Also be aware that the functionality of theSemaphoreSlim
in your solution can be replaced by theBoundedCapacity
configuration of theActionBlock
, in combination with the awaitableSendAsync
method. Comparatively it is more efficient (memory-wise). -
Caleb Holt about 3 years@TheodorZoulias Thanks so much for the feedback! It's something I'm actively working on for a project so I'll look at those changes and update my solution.
-
Alexei Levenkov about 3 yearsstackoverflow.com/a/65251949/477420 answer by @TheodorZoulias shows very similar approach... presumably SendAsync does not wait for operation to finish (which is not clear to me from the docs)
-
Theodor Zoulias about 3 yearsCaleb Holt another gotcha that you may want to be aware of is that enumerating the user-supplied
enumerable
could potentially result to an exception, and in this case your implementation will just propagate immediately this exception, without awaiting the completion of theActionBlock
. This is not an optimal behavior, because it may leave tasks running in the background unobserved (in fire-and-forget fashion). Implementing correctly aForEachAsync
method can be quite tricky. I became aware of the gotcha myself very recently. -
Theodor Zoulias about 3 years@AlexeiLevenkov the documentation of the
SendAsync
method is quite confusing. I doubt that a person smart enough has ever existed on this planet, that could understand what this method does by just reading the docs. One should delve deep into the source code and understand that bothPost
andSendAsync
methods are based on the hidden (explicitly implemented)OfferMessage
API, that has 5 possible return values. TheSendAsync
handles asynchronously thePostponed
return value. -
Tim about 3 yearsAlso, this line "await throttler.WaitAsync();" should NOT be within the try block because if it throws an exception you would be calling Release when you haven't acquired the lock.
-
Caleb Holt about 3 years@TheodorZoulias So, try around the loop and put complete/completion calls in a finally, then allow the exception to indicate it wasn't run to completion? I considered catching ALL exceptions and returning in aggregate, but depending on the cause that might result in an enormous number of exceptions. I suppose I could go down a rabbit hole with some options class and the let the caller decide if exceptions should be aggregated or terminated on first occurrence. That's the most flexible for a generic approach. (I've also wired through cancelationToken at this point.)
-
Theodor Zoulias about 3 yearsCaleb ideally you should watch out for exceptions coming from the
GetAsyncEnumerator
,MoveNextAsync
andDisposeAsync
methods, and if anyone of these fails, propagate that exception to theActionBlock
by invoking itsFault
method. That's a lot of work to do though, so you could take a shortcut and just wrap the loop in a try/catch, risking that a bug in the method's implementation would also be surfaced as a normal operational error. That's what I've done in this implementation. -
Theodor Zoulias about 3 years"You can use async lambda as well" <== could you explain this more? Or even better could you give an example of using an async lambda with a
Parallel.ForEach
loop, that does not result to buggy behavior? -
Theodor Zoulias about 3 yearsGravity I am sorry that I have to downvote your answer, but passing an async delegate to the
Parallel.ForEach
method is more than "not a best practice". It is deeply and irrecoverably flawed. TheParallel.ForEach
does not understand async delegates, so the lambda isasync void
. It's not fire-and-forget, it's fire-and-crash. In such a case theParallel.ForEach
will not wait for the completion of the launched operations, it will not enforce a maximum degree of parallelism, and it will not propagate exceptions. Any exception will be unhandled and will crash the process. -
Gravity API about 3 yearsIf you follow the example here, yes, but not if you implement it in a way to cover all the
-
Gravity API about 3 yearsI don't think down vote is justified in this case. This is only an answer to a specific question, NOT the recommended answer (the recommended answer is the one I have showed in the first example). Using async inside Parallel.ForEach can work and will not crash if handled, but again IT IS NOT the recommended approach and it is not related the original answer, which works flawlessly.
-
Gravity API about 3 yearsPlease run the following code, var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }; var items = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; Task.Run(() => Parallel.ForEach(items, options, async item => await Task.Run(() => Console.WriteLine($"{item}")))); Thread.Sleep(15000); it is an async Parallel.ForEach, that works. It does not crash - but, naturally enforce the max-parallel in a different way - which also can be handled, but you get the point :)
-
Theodor Zoulias about 3 yearsGravity this is a bad example. Parallelizing the
Console.WriteLine
method makes no sense, because this method is synchronized. Only one thread can write to theConsole
at a time. Also notice the ugliness of theThread.Sleep(15000);
. You added this line because otherwise the program would end before the completion of theasync void
operations launched uncontrollably by the misusedParallel.ForEach
loop. This is not the correct way to write software. -
Gravity API about 3 yearsThis is just an example - THIS IS NOT THE ANSWER for the original question, please read my original answer about it which says if you use async parallel you MUST have process to hold it - and that I DO NOT recommend it - THIS IS AN EXAMPLE of bad approach :). You can parallel inside what ever you want - console write line it is just and example. Please read the first answer - BEFORE the update - THIS IS my answer and this solve the parallel issue asked in the first place.
-
Gravity API about 3 yearsI have commented out the bad practice example to make the answer more clear
-
Theodor Zoulias about 3 yearsWell, you can't expect good votes by presenting bad examples and indirectly promoting bad practices, whether you recommend them or not. How about removing all the bad stuff from your answer, and keeping the good stuff?
-
Gravity API about 3 yearsAgreed :) That is why I removed it
-
Theodor Zoulias about 3 yearsGravity the phrase "You can use async lambda as well", connected with the
Parallel.ForEach
method, is an indisputable downvote by me. No amount of warnings before or after, or deletion indications like strikethrough, can make the presence of this phrase tolerable. I am talking exclusively about my own voting criteria. Anyone else can vote however they see fit. -
Gravity API about 3 yearsAccepted and make sense. Didn't think about it that way and I agree with your criteria and change my post accordingly.
-
Theodor Zoulias about 3 yearsI would upvote it, but I don't like the arbitrary
int maxDegreeOfParallelism = 10
. Better let the user specify the level of concurrency explicitly. You could also consider adding a.ConfigureAwait(false)
after theasyncAction(item)
(but not after thethrottler.WaitAsync()
). -
Alex from Jitbit about 3 years@TheodorZoulias both good points, edited. Also working on an variant that supports a
cancellationToken
will post it later. -
Theodor Zoulias about 3 yearsIt is worth noting that most solutions based on
Task.WhenAll
are only suitable for a relatively small number of tasks, or forasyncAction
s that are guaranteed not to throw. Otherwise it can be very frustrating to wait half an hour for the completion of 10,000 tasks, and just receive an exception as a result (that could have been thrown by the very first task). -
satnhak about 3 yearsThis seems to be the current state of the art review: medium.com/@alex.puiu/parallel-foreach-async-in-c-36756f8ebe62
-
Michael Freidgeim over 2 yearsOhad Schneider Answer in Nesting await in Parallel.ForEach adds error handling to Stephen Toub’s ForEachAsync extension
-
Theodor Zoulias over 2 yearsThere is already an answer mentioning and demonstrating the
Parallel.ForEachAsync
API, by Majid Shahabfar. -
Jurion over 2 yearsHello. Is there a way to simulate "break" ? Use case : There is a 30 minut limit on eecution. After 25 minutes I would like to stop execution and requeue remaining items for a new task.
-
zmechanic over 2 yearsThis is NOT a robust solution for 2 reasons. First, if exception thrown it will not terminate the loop. Second,
throttler
is not disposed. -
Alex from Jitbit over 2 years@zmechanic I think it's up to the developer whether or not to abort the loop on exception.
-
zmechanic over 2 years@Alex from Jitbit agree, but it's not stated in your answer, and LINQ behavior (as you do it) is different to
foreach
in this aspect. In LINQ, the exception will not terminate enumeration. -
Seabizkit over 2 yearscould you show how to add cancelationToken... @AlexfromJitbit I did ` foreach (var item in enumerable) { ct.ThrowIfCancellationRequested();` but feel like it could maybe also be passed to the Task.Run and when 0 maxDegreeOfParallelism it should also be factored.... could you assist in this regard
-
Seabizkit over 2 yearsPlease add cancellation token code... aswell plz
-
Raymond about 2 years@tim assuming your comment is resolved, can you remove it please (and/or @ me if I forget to remove this after you have!)?
-
Ganso Doido about 2 years@Tim Thank you Tim. I made the necessary changes.
-
Theodor Zoulias about 2 years@zmechanic regarding the
throttler
not being disposed, I agree that it should (by adding ausing
before thevar throttler
), but it's not a deal breaker: Do I need to Dispose a SemaphoreSlim. -
Theodor Zoulias about 2 yearsThe
if (cancellationToken.IsCancellationRequested) return;
is not needed. Theoretically it could result in theTask
completing successfully instead of being canceled, without doing all the work. -
Seabizkit almost 2 years@nicolas2008 what do you mean it will wait indefinitely, it that were true it would never exist the forloop?, of which it only enters if it has more to iterate over? so it defo gets out? which means 'using' will help as it exits the method no?
-
Michael Freidgeim almost 2 yearsFor handling exceptions see stackoverflow.com/questions/40149119/… and MS docs docs.microsoft.com/en-us/dotnet/standard/parallel-programming/…