Doesn't await when using ForEachAsync with await inside Action

12,988

That's because the implementation of ForEachAsync doesn't await the delegated action

moveNextTask = enumerator.MoveNextAsync(cancellationToken);
action(current);

see https://github.com/mono/entityframework/blob/master/src/EntityFramework/Infrastructure/IDbAsyncEnumerableExtensions.cs#L19

But that is because, you can't await an action, the delegate needs to be a Func which returns a Task - see How do you implement an async action delegate method?

Therefore, until Microsoft provides a signature which includes a Func delegate and calls it with await, you'll have to roll your own extension method. I'm using the following at the moment.

public static async Task ForEachAsync<T>(
    this IQueryable<T> enumerable, Func<T, Task> action, CancellationToken cancellationToken) //Now with Func returning Task
{
    var asyncEnumerable = (IDbAsyncEnumerable<T>)enumerable;
    using (var enumerator = asyncEnumerable.GetAsyncEnumerator())
    {

        if (await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false))
        {
            Task<bool> moveNextTask;
            do
            {
                var current = enumerator.Current;
                moveNextTask = enumerator.MoveNextAsync(cancellationToken);
                await action(current); //now with await
            }
            while (await moveNextTask.ConfigureAwait(continueOnCapturedContext: false));
        }
    }
}

With this, the original test code in your OP will work as expected.

Share:
12,988
Kind Contributor
Author by

Kind Contributor

YAGNI Bachelor of Information Technology Professional software developer Founder of software development company Founder of cybersecurity software company Avatar source: https://www.deviantart.com/aquasixio/art/Save-Our-Souls-16927664

Updated on June 13, 2022

Comments

  • Kind Contributor
    Kind Contributor almost 2 years

    The following should return "C", but it returns "B"

    using System.Data.Entity;
    //...
    var state = "A";
    var qry = (from f in db.myTable select f);
    await qry.ForEachAsync(async (myRecord) => {
       await DoStuffAsync(myRecord);
       state = "B";
    });
    state = "C";
    return state;
    

    It doesn't wait for DoStuffAsync to complete, state="C" runs through and then later state="B" executes (because inside it is still awaiting).

  • Kind Contributor
    Kind Contributor about 9 years
    I'm not sure how well your own ForEachAsync will go alongside the Action version. I simply removed the using System.Data.Entities; and had my own namespace.
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    Your implementation is similar to the ForEachAwaitAsync operator of the System.Linq.Async library. Todd's solution is more interesting IMHO, because it allows for concurrency in a way that it's unlikely to create problems, while improving the performance at the same time.
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    Also the ConfigureAwait(false) means that the action will not be invoked on the current SynchronizationContext. So for example if the project's type is WinForms and the action contains UI-related code, the ForEachAsync method will fail.
  • Rafi Henig
    Rafi Henig about 3 years
    @Theodor Zoulias thanks for your point, indeed, consider using ConfigureAwait(true) in UI applications (when you need need a synchronization context), otherwise you should always use ConfigureAwait(false), (looking at @Todd's answer I could see that he used ConfigureAwait(continueOnCapturedContext: false) too
  • Rafi Henig
    Rafi Henig about 3 years
    @Theodor Zoulias "allows for concurrency in a way that it's unlikely to create problems" it might be helpful (for me and others) to explain it.
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    Todd's solution allows each action on an item to occur concurrently with fetching the sequence's next item. These two concurrent operations are unlikely to depend on each other (by sharing state that needs to be synchronized), because they are quite distinct operations. Compare this to executing two actions for two different elements concurrently. This has a higher chance to create problems, because the concurrent operations are homogeneous, and may depend on some non-thread-safe shared state (a DBConnection for example).
  • Theodor Zoulias
    Theodor Zoulias about 3 years
    Regarding the ConfigureAwait(false) dilemma, you can see here how the Polly library deals with it. They offer overloads with a bool continueOnCapturedContext parameter, where false is the default. But at least it is configurable.