How to use await in a parallel foreach?

18,814

Don't use Parralel.ForEach at all. Make your method to return Task instead of void, collect all the task and wait them like:

Task.WaitAll(data.Select(d => MyMethod(d, someParam)).ToArray());
Share:
18,814

Related videos on Youtube

ThunD3eR
Author by

ThunD3eR

By day: .net developer By night: .net developer For fun trying all possible technologies

Updated on June 28, 2022

Comments

  • ThunD3eR
    ThunD3eR almost 2 years

    So I sepnt the better part of the night trying to figure this out.

    I was fortunate to get introduced to the parallel.foreach yesterday and it works like I want it to do except from one detail.

    I have the following:

            Parallel.ForEach(data, (d) =>
            {
                try
                {
                    MyMethod(d, measurements);
                }
                catch (Exception e)
                {
                  // logg
                }
    
            });
    

    Within the method "MyMethod" I have alot of logic that gets done and most of it is fine but I make api calls where I fetch data and I use async task for this to be able to use "await" in order for the code to wait untill that specific part gets executed and then move on:

        private async void MyMethod(PimData pimData, IEnumerable<ProductMeasurements> measurements)
        {
            try
            {
               // alot of logic but most relevant part 
    
                await Task.WhenAll(ExecuteMeasurmentAndChartLogic(pimData.ProductNumber, entity));
                await Task.WhenAll(resourceImportManager.HandleEntityImageFiles(pimData.ProductType + pimData.ProductSize,SwepImageType.Png, ResourceFileTypes.ThreeD, entity, LinkTypeId.ProductResource));
    
                await Task.WhenAll(resourceImportManager.HandleEntityImageFiles(pimData.ProductSketch, SwepImageType.Png, ResourceFileTypes.Sketch, entity, LinkTypeId.ProductResource));
    
            }
            catch (Exception e)
            {
                // logg
            }
        }
    

    Problems:

    1 For starters the loop finishes before all code is finished

    2 Second problem is that I get "Task was canceled" in alot of api calls

    3 And third as mentioned above, the code does not wait for each method to full execute.

    I cant get it to execute everything in ExecuteMeasurmentAndChartLogic() method before moving forward to the next step.

    This gives me the following issues (more issues):

    In this method I create an item and add it to the db, and this item needs more info that I get from an api call that is done inside of ExecuteMeasurmentAndChartLogic() but the problem is that several items get craeated and have to wait for the rest of the data which is not what I desire.

    SIDE-NOTE: I am aware of that crating an item and adding to the db before all data is there is not best practice but I am integrating toward PIM and the process for that is delicate

    I want several threads running but at the same time I want the fuill logic to execute for each item before moving on to the next method.

    Clarify:

    Several items running

    Each item handels ALL the logic it needs to handel before moving on to the next part of the code, noramly do this with await.

    In the code above resourceImportManager() method gets executed before ExecuteMeasurmentAndChartLogic() is finished. which is what I dont want.

    Instead of a Parallel.ForEach I used :

        Task task1 = Task.Factory.StartNew(() => MyMethod(data, measurements));
        Task.WaitAll(task1);
    

    but that wasnt much of a help

    Fairly new to this and havent been able to understand where I am doing it wrong.

    EDIT: Updated the problems with this

    EDIT: this is how ExecuteMeasurmentAndChartLogic() looks like:

        public async Task ExecuteMeasurmentAndChartLogic(string productNumber, Entity entity)
        {
            try
            {
                GrafGeneratorManager grafManager = new GrafGeneratorManager();
                var graphMeasurmentList = await MeasurmentHandler.GetMeasurments(productNumber);
    
                if (graphMeasurmentList.Count == 0) return;
    
                var chart = await grafManager.GenerateChart(500, 950, SystemColors.Window, ChartColorPalette.EarthTones,
                    "legend", graphMeasurmentList);
    
                await AddChartsAndAddToXpc(chart, entity, productNumber);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
    
        }
    

    EDIT: Background to this: I make a call to a api to get alot of data. For each item in this data I need to make an api call and get data that I apply to the item.

    After reading comments which alos got me thinking in a diffirent way. I can perhaps loop through all my items and do minor logic for them and add a url in a task list and make a seperate task that executes this one by one.

    WIll keep this updated

    • Peter Bons
      Peter Bons almost 7 years
      What does resourceImportManager.HandleEntityImageFiles return. For a starter it looks like you are using WaitAll and WhenAll for single tasks. Also, async void is a big no no, you could probably change Parallel.ForEach to Task.WhenAll with some refactoring.
    • Peter Bons
      Peter Bons almost 7 years
      Also, parallel.for is better for CPU bound work, and Task.WhenAll for I/O bound work. using async/await in parellel for is a bit of a design flaw imho
    • Tim Rogers
      Tim Rogers almost 7 years
      To echo comments above, either it's CPU-bound, in which case you should use Parallel.ForEach, or it's IO-bound, in which case you should use async.
    • Peter Bons
      Peter Bons almost 7 years
      But I suggest you read some more tutorials before creating such logic as you are mixing up too many things and this is not te best place to explain all these things.
  • Eduard Lepner
    Eduard Lepner almost 7 years
    The code that I provided is Asynchronous and Parallel. It might be executed in one thread if Thread pool configuration is broken or there're insufficient threads to capture. Or maybe MyMethod has bottleneck and actually access the same blocking resource. However if this method is valid, then my answer should be valid as well.
  • ThunD3eR
    ThunD3eR almost 7 years
    There is not blocking resourse. I am debuging this and after a few items it stops and waits for items to execute. This is to slow
  • Servy
    Servy almost 7 years
    @Ra3IDeN Then your actual method isn't designed to run in parallel, and you need to fix that.
  • ThunD3eR
    ThunD3eR almost 7 years
    @Servy I think I did, Check my answer below.
  • Servy
    Servy almost 7 years
    @Ra3IDeN That answer reduces the amount of work that can be done in parallel, for the main work being done, and incorrectly parallelizes work that can't be done in parallel, resulting in unsafe code.
  • ThunD3eR
    ThunD3eR over 6 years
    Finaly decided to accept this answer since it answers the question asked. However I will not use the solution since I need multithreading and this locks each thread.
  • turkinator
    turkinator almost 5 years
    How about Task.WaitAll(data.Select(d => Task.Run(() => MyMethod(d, someParam)).ToArray())); to retain parallelism? Haven't tested myself, but just an idea.
  • MBentley
    MBentley over 2 years
    One other thing to keep in mind is that WaitAll does not give you a way to stop before everything runs. In a normal Parallell.ForEach, you can loopState.Stop() and exit early if you run into a fatal case. That can be really nice when handling many operations.