C# async within an action

50,156

Solution 1

The problem is in this line:

return new Action(async () => ...

You start an async operation with the async lambda, but don't return a task to await on. I.e. it runs on worker threads, but you'll never find out when it's done. And your program terminates before the async operation is complete -that's why you don't see any output.

It needs to be:

return new Func<Task>(async () => ...

UPDATE

First, you need to split responsibilities of methods, so you don't mix retry policy (which should not be hardcoded to a check of a boolean result) with running tasks in parallel.

Then, as previously mentioned, you run your while (true) loop 100 times instead of doing things in parallel.

As @MachineLearning pointed out, use Task.Delay instead of Thread.Sleep.

Overall, your solution looks like this:

using System.Collections.Async;

static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
{
    while (true)
    {
        try
        {
            await action();
            break;
        }
        catch (Exception e)
        {
            LoggerAgent.LogException(e, method);
        }

        if (retryCount <= 0)
            break;

        retryCount--;
        await Task.Delay(millisecondsDelay: 200);
    };
}

static async Task Example()
{
    List<int> ints = new List<int>();
    for (int i = 0; i < 1000; i++)
        ints.Add(i);

    Func<int, Task> actionOnItem =
        async item =>
        {
            await DoWithRetries(async () =>
            {
                List<string> test = await fetchSmthFromDb();
                Console.WriteLine("#" + item + "  " + test[0]);
                if (test[0] != "test")
                    throw new InvalidOperationException("unexpected result"); // will be re-tried
            },
            retryCount: 5,
            method: "test");
        };

    await ints.ParallelForEachAsync(actionOnItem, maxDegreeOfParalellism: 100);
}

You need to use the AsyncEnumerator NuGet Package in order to use the ParallelForEachAsync extension method from the System.Collections.Async namespace.

Solution 2

Besides the final complete reengineering, I think it's very important to underline what was really wrong with the original code.

0) First of all, as @Serge Semenov immediately pointed out, Action has to be replaced with

Func<Task>

But there are still other two essential changes.

1) With an async delegate as argument it is necessary to use the more recent Task.Run instead of the older pattern new TaskFactory.StartNew (or otherwise you have to add Unwrap() explicitly)

2) Moreover the ex() method can't be async since Task.WhenAll must be waited with Wait() and without await.

At that point, even though there are logical errors that need reengineering, from a pure technical standpoint it does work and the output is produced.

A test is available online: http://rextester.com/HMMI93124

Share:
50,156

Related videos on Youtube

Ori Refael
Author by

Ori Refael

hey, im Ori, 24 years old. devloping in C#, WCF,ASP.NET in visual studio, jQuery,javascript,css,html, jQuery Plugins. looking for a faster and better way to learn all the time so if u got some materials and you have some time to send to me somehow it would be awsome ! love you all, great community !

Updated on August 29, 2020

Comments

  • Ori Refael
    Ori Refael over 3 years

    I would like to write a method which accept several parameters, including an action and a retry amount and invoke it.

    So I have this code:

    public static IEnumerable<Task> RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
        {
            object lockObj = new object();
            int index = 0;
    
            return new Action(async () =>
            {
                while (true)
                {
                    T item;
                    lock (lockObj)
                    {
                        if (index < source.Count)
                        {
                            item = source[index];
                            index++;
                        }
                        else
                            break;
                    }
    
                    int retry = retries;
                    while (retry > 0)
                    {
                        try
                        {
                            bool res = await action(item);
                            if (res)
                                retry = -1;
                            else
                                //sleep if not success..
                                Thread.Sleep(200);
    
                        }
                        catch (Exception e)
                        {
                            LoggerAgent.LogException(e, method);
                        }
                        finally
                        {
                            retry--;
                        }
                    }
                }
            }).RunParallel(threads);
        }
    

    RunParallel is an extention method for Action, its look like this:

    public static IEnumerable<Task> RunParallel(this Action action, int amount)
        {
            List<Task> tasks = new List<Task>();
            for (int i = 0; i < amount; i++)
            {
                Task task = Task.Factory.StartNew(action);
                tasks.Add(task);
            }
            return tasks;
        }
    

    Now, the issue: The thread is just disappearing or collapsing without waiting for the action to finish.

    I wrote this example code:

    private static async Task ex()
        {
            List<int> ints = new List<int>();
            for (int i = 0; i < 1000; i++)
            {
                ints.Add(i);
            }
    
            var tasks = RetryComponent.RunWithRetries(ints, 100, async (num) =>
            {
                try
                {
                    List<string> test = await fetchSmthFromDb();
                    Console.WriteLine("#" + num + "  " + test[0]);
                    return test[0] == "test";
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.StackTrace);
                    return false;
                }
    
            }, 5, "test");
    
            await Task.WhenAll(tasks);
        }
    

    The fetchSmthFromDb is a simple Task> which fetches something from the db and works perfectly fine when invoked outside of this example.

    Whenever the List<string> test = await fetchSmthFromDb(); row is invoked, the thread seems to be closing and the Console.WriteLine("#" + num + " " + test[0]); not even being triggered, also when debugging the breakpoint never hit.

    The Final Working Code

    private static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
        {
            while (true)
            {
                try
                {
                    await action();
                    break;
                }
                catch (Exception e)
                {
                    LoggerAgent.LogException(e, method);
                }
    
                if (retryCount <= 0)
                    break;
    
                retryCount--;
                await Task.Delay(200);
            };
        }
    
        public static async Task RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
        {
            Func<T, Task> newAction = async (item) =>
            {
                await DoWithRetries(async ()=>
                {
                    await action(item);
                }, retries, method);
            };
            await source.ParallelForEachAsync(newAction, threads);
        }
    
    • Admin
      Admin over 7 years
      Are you sure your logger is thread safe? I'm getting "Thread was being aborted" when I replace it with Console.WriteLine... also what is the lock for? What are you trying to do?
    • Serge Semenov
      Serge Semenov over 7 years
      I'm really confused with the example above. Why are trying to run the same action 100 times in parallel? (the RunParallel method) Is it some sort of load testing on a DB?
    • Ori Refael
      Ori Refael over 7 years
      Logger is not thread safe, but doesn't crash for me. @SergeSemenov Im using mongodb and i cant update 100 files in one procedure like we can do in SQL, so i built a method to accept list of actions on a single enumerable and operate as a single procedure
    • Serge Semenov
      Serge Semenov over 7 years
      And you did it in a wrong way because you run your while (true) loop 100 times
    • Ori Refael
      Ori Refael over 7 years
      i would be happy for an insight
    • Stephen Cleary
      Stephen Cleary over 7 years
      @OriRefael: You might want to check out Polly, which has full support for async retries.
    • Ori Refael
      Ori Refael over 7 years
      @StephenCleary thanks. its working pretty well now so at least for now, thats all i need :)
  • Ori Refael
    Ori Refael over 7 years
    Thanks for the replay, but what do I do with the extension method, creates another extension for Fun<Task> ?
  • Serge Semenov
    Serge Semenov over 7 years
    I suggest you to update your question with the code your changed
  • Ori Refael
    Ori Refael over 7 years
    but why the DoWithRetries inside the thread? i could already solve it if i wrote the retries mechanism inside every method. I want something to wrap the original task and not break it. I need the Threads amount to use as input, the purpese was to save all this code wrapping the main task and not copy paste it
  • Admin
    Admin over 7 years
    Thanks for mentioning my suggestion in your answer! :-)
  • Ori Refael
    Ori Refael over 7 years
    neither the less, i modified the code so it be more generic and its working. Thanks alot
  • Serge Semenov
    Serge Semenov over 7 years
    The answer to the why question is the SOLID software design principles, otherwise your code turns into Spaghetti Code ( see Wikipedia). You are welcome.