Linq and Async Lambdas

13,886

Solution 1

You can't out of the box with LINQ. But you can write a little extension method which can make this work:

public static class AsyncExtensions
{
    public static async Task<bool> AnyAsync<T>(
        this IEnumerable<T> source, Func<T, Task<bool>> func)
    {
        foreach (var element in source)
        {
            if (await func(element))
                return true;
        }
        return false;
    }
}

And consume it like this:

static async Task MainAsync(string[] args)
{
    int[] test = new[] { 1, 2, 3, 4, 5 };

    if (await test.AnyAsync(async i => await TestIt(i))
        Console.WriteLine("Contains numbers > 3");
    else
        Console.WriteLine("Contains numbers <= 3");
}

It does feel a little cumbersome to me, but it achieves your goal.

Solution 2

If you're working with a small subset of LINQ methods, I recommend following @YuvalItzchakov's answer as it relies solely on components available as part of the Base Class Library.

If rich query functionality over async sequences is necessary, you can use Rx.NET instead. Rx provides a wide array of LINQ methods over async sequences, some of which work with Task-returning delegates, i.e. SelectMany:

IEnumerable<int> numbers = Enumerable.Range(0, 10);

IEnumerable<int> evenNumbers = numbers
    .ToObservable()
    .SelectMany(async i => new { Value = i, IsMatch = await IsEven(i) })
    .Where(a => a.IsMatch)
    .Select(a => a.Value)
    .ToEnumerable();

async Task<bool> IsEven(int i)
{
    await Task.Delay(100);

    return i % 2 == 0;
}

Solution 3

How do you work with Async Lambdas and linq?

Mind if I turn that around? How do you want them to work?

Any time you start processing asynchronous streams, there's a lot of questions around semantics. It's not just slapping a Where clause on like you do with LINQ.

In this case, you're looking for some kind of "async where" filter applied to a synchronous source sequence. The entire idea of asynchronous code is that asynchronous operations may take varying amounts of time (and you want to free up your calling thread while that operation is in progress).

So, the first question to be answered about "async where" is when the filter is called. Since the source sequence is synchronous (an array), all of the input values are available immediately. Should the "async where" start the asynchronous filter for all of the elements concurrently, or should they only be processed one at a time?

If this was an actual "async where" instead of an "async any", the next question would be the ordering of the resulting sequence (i.e., when the results are evaluated). If we start all the asynchronous filters concurrently, then they can complete in a different order than they started. Should the resulting asynchronous sequence produce its first value as soon as any asynchronous filter returns true, or should the resulting sequence keep the original values in the same order (which implies buffering)?

Different scenarios require different answers to these questions. Rx is capable of expressing any of these answers, but it rather difficult to learn. Async/await is easier to read but less expressive.

Since this is an Any (not as general as a Where), you just have the first question to answer: can the filters run concurrently or one at a time?

If one at a time, then an approach like Yuval's would work:

bool found = false;
foreach (var i in test)
{
  if (await TestIt(i))
  {
    found = true;
    break;
  }
}
if (found)
  Console.WriteLine("Contains numbers > 3");
else
  Console.WriteLine("Contains numbers <= 3");

If the filters can run concurrently, then something like this:

var tasks = test.Select(i => TestIt(i)).ToList();
bool found = false;
while (tasks.Count != 0)
{
  var completed = await Task.WhenAny(tasks);
  tasks.Remove(completed);
  if (await completed)
  {
    found = true;
    break;
  }
}
if (found)
  Console.WriteLine("Contains numbers > 3");
else
  Console.WriteLine("Contains numbers <= 3");
Share:
13,886
Mick
Author by

Mick

Microsoft Software developer with 20+ years experience and the plus just keeps on getting bigger.

Updated on June 28, 2022

Comments

  • Mick
    Mick almost 2 years

    The following code...

    using System;
    using System.Linq;
    using System.Threading.Tasks;
    
    namespace ConsoleAsync
    {
        class Program
        {
            static void Main(string[] args)
            {
                MainAsync(args).Wait();
                Console.ReadLine();
            }
    
            static async Task MainAsync(string[] args)
            {
                int[] test = new[] { 1, 2, 3, 4, 5 };
    
                if (test.Any(async i => await TestIt(i)))
                    Console.WriteLine("Contains numbers > 3");
                else
                    Console.WriteLine("Contains numbers <= 3");
            }
    
            public static async Task<bool> TestIt(int i)
            {
                return await Task.FromResult(i > 3);
            }
        }
    }
    

    Gives you the following error:-

    CS4010: Cannot convert async lambda expression to delegate type 'Func<int, bool>'. An async lambda expression may return void, Task or Task<T>, none of which are convertible to 'Func<int, bool>'.

    On the line

    if (test.Any(async i => await Test.TestIt(i)))
    

    How do you work with Async Lambdas and linq?

  • Mick
    Mick about 8 years
    I've added my own answer, however it's more of a comment on your answer, as it's really not an answer at all.
  • Mick
    Mick about 8 years
    With the Any method I think it would be desirable for each element to be processed sequentially, to avoid unnecessary processing, with the Where you'd want the processing in parallel. Ordering with Linq has always been another call, so the ordering of the results of a Where can be arbitrary. You make a good argument that the asynchronous implementation for each Linq method is going to be different based on the function of the method.
  • Mrinal Kamboj
    Mrinal Kamboj over 5 years
    This code will be sequentially executed, which can be easily tested by introducing a Task.Delay anywhere in the AnyAsync. Though that's not the main question, but on introducing Async its equally important to have the parallel execution of the Tasks. IMO Observables by default do a better job out here.