Async/await tasks and WaitHandle

14,445

Well, you could build this yourself, but I think it's tons easier with TPL Dataflow.

Something like:

static async Task DoWork()
{
  // By default, ActionBlock uses MaxDegreeOfParallelism == 1,
  //  so AThreadUnsafeMethod is not called in parallel.
  var block = new ActionBlock<Item>(AThreadUnsafeMethod);

  // Start off N tasks, each asynchronously acquiring 10 items.
  // Each item is sent to the block as it is received.
  var tasks = Enumerable.Range(0, N).Select(Task.Run(
      async () =>
      {
        for (int i = 0; i != 10; ++i)
          block.Post(await GetItemAsync());
      })).ToArray();

  // Complete the block when all tasks have completed.
  Task.WhenAll(tasks).ContinueWith(_ => { block.Complete(); });

  // Wait for the block to complete.
  await block.Completion;
}
Share:
14,445
Cheng Chen
Author by

Cheng Chen

500 Internal Server Error

Updated on June 04, 2022

Comments

  • Cheng Chen
    Cheng Chen almost 2 years

    Say I have 10N items(I need to fetch them via http protocol), in the code N Tasks are started to get data, each task takes 10 items in sequence. I put the items in a ConcurrentQueue<Item>. After that, the items are processed in a thread-unsafe method one by one.

    async Task<Item> GetItemAsync()
    {
        //fetch one item from the internet
    }
    
    async Task DoWork()
    {
        var tasks = new List<Task>();
        var items = new ConcurrentQueue<Item>();
        var handles = new List<ManualResetEvent>();
    
        for i 1 -> N
        {
            var handle = new ManualResetEvent(false);
            handles.Add(handle);
    
            tasks.Add(Task.Factory.StartNew(async delegate
            {
                for j 1 -> 10
                {
                    var item = await GetItemAsync();
                    items.Enqueue(item);
                }
                handle.Set();
            });
        }
    
        //begin to process the items when any handle is set
        WaitHandle.WaitAny(handles);
    
        while(true)
        {
             if (all handles are set && items collection is empty) //***
               break; 
             //in another word: all tasks are really completed
    
             while(items.TryDequeue(out item))          
             {
                  AThreadUnsafeMethod(item);    //process items one by one
             }
        }
    }
    

    I don't know what if condition can be placed in the statement marked ***. I can't use Task.IsCompleted property here, because I use await in the task, so the task is completed very soon. And a bool[] that indicates whether the task is executed to the end looks really ugly, because I think ManualResetEvent can do the same work. Can anyone give me a suggestion?