C# async within an action
Solution 1
The problem is in this line:
return new Action(async () => ...
You start an async operation with the async lambda, but don't return a task to await on. I.e. it runs on worker threads, but you'll never find out when it's done. And your program terminates before the async operation is complete -that's why you don't see any output.
It needs to be:
return new Func<Task>(async () => ...
UPDATE
First, you need to split responsibilities of methods, so you don't mix retry policy (which should not be hardcoded to a check of a boolean result) with running tasks in parallel.
Then, as previously mentioned, you run your while (true)
loop 100 times instead of doing things in parallel.
As @MachineLearning pointed out, use Task.Delay
instead of Thread.Sleep
.
Overall, your solution looks like this:
using System.Collections.Async;
static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
{
while (true)
{
try
{
await action();
break;
}
catch (Exception e)
{
LoggerAgent.LogException(e, method);
}
if (retryCount <= 0)
break;
retryCount--;
await Task.Delay(millisecondsDelay: 200);
};
}
static async Task Example()
{
List<int> ints = new List<int>();
for (int i = 0; i < 1000; i++)
ints.Add(i);
Func<int, Task> actionOnItem =
async item =>
{
await DoWithRetries(async () =>
{
List<string> test = await fetchSmthFromDb();
Console.WriteLine("#" + item + " " + test[0]);
if (test[0] != "test")
throw new InvalidOperationException("unexpected result"); // will be re-tried
},
retryCount: 5,
method: "test");
};
await ints.ParallelForEachAsync(actionOnItem, maxDegreeOfParalellism: 100);
}
You need to use the AsyncEnumerator NuGet Package in order to use the ParallelForEachAsync
extension method from the System.Collections.Async
namespace.
Solution 2
Besides the final complete reengineering, I think it's very important to underline what was really wrong with the original code.
0) First of all, as @Serge Semenov immediately pointed out, Action has to be replaced with
Func<Task>
But there are still other two essential changes.
1) With an async delegate as argument it is necessary to use the more recent Task.Run instead of the older pattern new TaskFactory.StartNew (or otherwise you have to add Unwrap() explicitly)
2) Moreover the ex() method can't be async since Task.WhenAll must be waited with Wait() and without await.
At that point, even though there are logical errors that need reengineering, from a pure technical standpoint it does work and the output is produced.
A test is available online: http://rextester.com/HMMI93124
Related videos on Youtube
Ori Refael
hey, im Ori, 24 years old. devloping in C#, WCF,ASP.NET in visual studio, jQuery,javascript,css,html, jQuery Plugins. looking for a faster and better way to learn all the time so if u got some materials and you have some time to send to me somehow it would be awsome ! love you all, great community !
Updated on August 29, 2020Comments
-
Ori Refael over 3 years
I would like to write a method which accept several parameters, including an action and a retry amount and invoke it.
So I have this code:
public static IEnumerable<Task> RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method) { object lockObj = new object(); int index = 0; return new Action(async () => { while (true) { T item; lock (lockObj) { if (index < source.Count) { item = source[index]; index++; } else break; } int retry = retries; while (retry > 0) { try { bool res = await action(item); if (res) retry = -1; else //sleep if not success.. Thread.Sleep(200); } catch (Exception e) { LoggerAgent.LogException(e, method); } finally { retry--; } } } }).RunParallel(threads); }
RunParallel is an extention method for Action, its look like this:
public static IEnumerable<Task> RunParallel(this Action action, int amount) { List<Task> tasks = new List<Task>(); for (int i = 0; i < amount; i++) { Task task = Task.Factory.StartNew(action); tasks.Add(task); } return tasks; }
Now, the issue: The thread is just disappearing or collapsing without waiting for the action to finish.
I wrote this example code:
private static async Task ex() { List<int> ints = new List<int>(); for (int i = 0; i < 1000; i++) { ints.Add(i); } var tasks = RetryComponent.RunWithRetries(ints, 100, async (num) => { try { List<string> test = await fetchSmthFromDb(); Console.WriteLine("#" + num + " " + test[0]); return test[0] == "test"; } catch (Exception e) { Console.WriteLine(e.StackTrace); return false; } }, 5, "test"); await Task.WhenAll(tasks); }
The
fetchSmthFromDb
is a simple Task> which fetches something from the db and works perfectly fine when invoked outside of this example.Whenever the
List<string> test = await fetchSmthFromDb();
row is invoked, the thread seems to be closing and theConsole.WriteLine("#" + num + " " + test[0]);
not even being triggered, also when debugging the breakpoint never hit.The Final Working Code
private static async Task DoWithRetries(Func<Task> action, int retryCount, string method) { while (true) { try { await action(); break; } catch (Exception e) { LoggerAgent.LogException(e, method); } if (retryCount <= 0) break; retryCount--; await Task.Delay(200); }; } public static async Task RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method) { Func<T, Task> newAction = async (item) => { await DoWithRetries(async ()=> { await action(item); }, retries, method); }; await source.ParallelForEachAsync(newAction, threads); }
-
Admin over 7 yearsAre you sure your logger is thread safe? I'm getting "Thread was being aborted" when I replace it with Console.WriteLine... also what is the lock for? What are you trying to do?
-
Serge Semenov over 7 yearsI'm really confused with the example above. Why are trying to run the same action 100 times in parallel? (the
RunParallel
method) Is it some sort of load testing on a DB? -
Ori Refael over 7 yearsLogger is not thread safe, but doesn't crash for me. @SergeSemenov Im using mongodb and i cant update 100 files in one procedure like we can do in SQL, so i built a method to accept list of actions on a single enumerable and operate as a single procedure
-
Serge Semenov over 7 yearsAnd you did it in a wrong way because you run your
while (true)
loop 100 times -
Ori Refael over 7 yearsi would be happy for an insight
-
Stephen Cleary over 7 years@OriRefael: You might want to check out Polly, which has full support for async retries.
-
Ori Refael over 7 years@StephenCleary thanks. its working pretty well now so at least for now, thats all i need :)
-
-
Ori Refael over 7 yearsThanks for the replay, but what do I do with the extension method, creates another extension for Fun<Task> ?
-
Serge Semenov over 7 yearsI suggest you to update your question with the code your changed
-
Ori Refael over 7 yearsbut why the DoWithRetries inside the thread? i could already solve it if i wrote the retries mechanism inside every method. I want something to wrap the original task and not break it. I need the Threads amount to use as input, the purpese was to save all this code wrapping the main task and not copy paste it
-
Admin over 7 yearsThanks for mentioning my suggestion in your answer! :-)
-
Ori Refael over 7 yearsneither the less, i modified the code so it be more generic and its working. Thanks alot
-
Serge Semenov over 7 yearsThe answer to the why question is the SOLID software design principles, otherwise your code turns into Spaghetti Code ( see Wikipedia). You are welcome.