How does local initialization with Parallel ForEach work?

12,212

Solution 1

With reference to the following overload of the Parallel.ForEach static extension method:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody,
    Action<TLocal> localFinally
)

In your specific example

The line:

() => 0, // method to initialize the local variable

is simply a lambda (anonymous function) which will return the constant integer zero. This lambda is passed as the localInit parameter to Parallel.ForEach - since the lambda returns an integer, it has type Func<int> and type TLocal can be inferred as int by the compiler (similarly, TSource can be inferred from the type of the collection passed as parameter source)

The return value (0) is then passed as the 3rd parameter (named subtotal) to the taskBody Func. This (0) is used the initial seed for the body loop:

(j, loop, subtotal) =>
{
    subtotal += nums[j]; //modify local variable (Bad idea, see comment)
    return subtotal;     // value to be passed to next iteration
}

This second lambda (passed to taskBody) is called N times, where N is the number of items allocated to this task by the TPL partitioner.

Each subsequent call to the second taskBody lambda will pass the new value of subTotal, effectively calculating a running partial total, for this Task. After all the items assigned to this task have been added, the third and last, localFinally function parameter will be called, again, passing the final value of the subtotal returned from taskBody. Because several such tasks will be operating in parallel, there will also need to be a final step to add up all the partial totals into the final 'grand' total. However, because multiple concurrent tasks (on different Threads) could be contending for the grandTotal variable, it is important that changes to it are done in a thread-safe manner.

(I've changed names of the MSDN variables to make it more clear)

long grandTotal = 0;
Parallel.ForEach(nums,            // source collection
  () => 0,                        // method to initialize the local variable
  (j, loop, subtotal) =>          // method invoked by the loop on each iteration
     subtotal + nums[j],          // value to be passed to next iteration subtotal
  // The final value of subtotal is passed to the localFinally function parameter
  (subtotal) => Interlocked.Add(ref grandTotal, subtotal)

In the MS Example, modification of the parameter subtotal inside the task body is a poor practice, and unnecessary. i.e. The code subtotal += nums[j]; return subtotal; would be better as just return subtotal + nums[j]; which could be abbreviated to the lambda shorthand projection (j, loop, subtotal) => subtotal + nums[j]

In General

The localInit / body / localFinally overloads of Parallel.For / Parallel.ForEach allow once-per task initialization and cleanup code to be run, before, and after (respectively) the taskBody iterations are performed by the Task.

(Noting the For range / Enumerable passed to the parallel For / Foreach will be partitioned into batches of IEnumerable<>, each of which will be allocated a Task)

In each Task, localInit will be called once, the body code will be repeatedly invoked, once per item in batch (0..N times), and localFinally will be called once upon completion.

In addition, you can pass any state required for the duration of the task (i.e. to the taskBody and localFinally delegates) via a generic TLocal return value from the localInit Func - I've called this variable taskLocals below.

Common uses of "localInit":

  • Creating and initializing expensive resources needed by the loop body, like a database connection or a web service connection.
  • Keeping Task-Local variables to hold (uncontended) running totals or collections
  • If you need to return multiple objects from localInit to the taskBody and localFinally, you can make use of a strongly typed class, a Tuple<,,> or, if you use only lambdas for the localInit / taskBody / localFinally, you can also pass data via an anonymous class. Note if you use the return from localInit to share a reference type among multiple tasks, that you will need to consider thread safety on this object - immutability is preferable.

Common uses of the "localFinally" Action:

  • To release resources such as IDisposables used in the taskLocals (e.g. database connections, file handles, web service clients, etc)
  • To aggregate / combine / reduce the work done by each task back into shared variable(s). These shared variables will be contended, so thread safety is a concern:
    • e.g. Interlocked.Increment on primitive types like integers
    • lock or similar will be required for write operations
    • Make use of the concurrent collections to save time and effort.

The taskBody is the tight part of the loop operation - you'll want to optimize this for performance.

This is all best summarized with a commented example:

public void MyParallelizedMethod()
{
    // Shared variable. Not thread safe
    var itemCount = 0; 

    Parallel.For(myEnumerable, 
    // localInit - called once per Task.
    () => 
    {
       // Local `task` variables have no contention 
       // since each Task can never run by multiple threads concurrently
       var sqlConnection = new SqlConnection("connstring...");
       sqlConnection.Open();

       // This is the `task local` state we wish to carry for the duration of the task
       return new 
       { 
          Conn = sqlConnection,
          RunningTotal = 0
       }
    },
    // Task Body. Invoked once per item in the batch assigned to this task
    (item, loopState, taskLocals) =>
    {
      // ... Do some fancy Sql work here on our task's independent connection
      using(var command = taskLocals.Conn.CreateCommand())
      using(var reader = command.ExecuteReader(...))
      {
        if (reader.Read())
        {
           // No contention for `taskLocal`
           taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
        }
      }
      // The same type of our `taskLocal` param must be returned from the body
      return taskLocals;
    },
    // LocalFinally called once per Task after body completes
    // Also takes the taskLocal
    (taskLocals) =>
    {
       // Any cleanup work on our Task Locals (as you would do in a `finally` scope)
       if (taskLocals.Conn != null)
         taskLocals.Conn.Dispose();

       // Do any reduce / aggregate / synchronisation work.
       // NB : There is contention here!
       Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
    }

And more examples:

Example of per-Task uncontended dictionaries

Example of per-Task database connections

Solution 2

as an extension to @Honza Brestan's answer. The way Parallel foreach splits the work into tasks can also be important, it will group several loop iterations into a single task so in practice localInit() is called once for every n iterations of the loop and multiple groups can be started simultaneously.

The point of a localInit and localFinally is to ensure that a parallel foreach loop can combine results from each itteration into a single result without you needing to specify lock statements in the body, to do this you must provide an initialisation for the value you want to create (localInit) then each body itteration can process the local value, then you provide a method to combine values from each group (localFinally) in a thread-safe way.

If you don't need localInit for synchronising tasks, you can use lambda methods to reference values from the surrounding context as normal without any problems. See Threading in C# (Parallel.For and Parallel.ForEach) for a more in depth tutorial on using localInit/Finally and scroll down to Optimization with local values, Joseph Albahari is really my goto source for all things threading.

Solution 3

You can get a hint on MSDN in the correct Parallel.ForEach overload.

The localInit delegate is invoked once for each thread that participates in the loop's execution and returns the initial local state for each of those tasks. These initial states are passed to the first body invocations on each task. Then, every subsequent body invocation returns a possibly modified state value that is passed to the next body invocation.

In your example () => 0 is a delegate just returning 0, so this value is used for the first iteration on each task.

Share:
12,212
Luis Ferrao
Author by

Luis Ferrao

Updated on June 05, 2022

Comments

  • Luis Ferrao
    Luis Ferrao almost 2 years

    I am unsure about the use of the local init function in Parallel.ForEach, as described in the msdn article: http://msdn.microsoft.com/en-us/library/dd997393.aspx

    Parallel.ForEach<int, long>(nums, // source collection
       () => 0, // method to initialize the local variable
       (j, loop, subtotal) => // method invoked by the loop on each iteration
       {
          subtotal += nums[j]; //modify local variable 
          return subtotal; // value to be passed to next iteration
       },...
    

    How does () => 0 initialize anything? What's the name of the variable and how can I use it in the loop logic?

  • Luis Ferrao
    Luis Ferrao about 11 years
    I had read that text before posting here, yet, my question remains the same. How does using a lambda that just returns 0 provide me with a value that "is used for the first iteration on each task"? How can I use this value in the code? It has no identifier.
  • Honza Brestan
    Honza Brestan about 11 years
    It has - it is the subtotal in your example. But only in the first iteration in each task. The value of subtotal in all other iterations in that task is what is returned from the previous iteration.
  • Luis Ferrao
    Luis Ferrao about 11 years
    Ok, I follow you so far, but what part of this code indicates that "() => 0" = subtotal? What's the convention between the init function and (j, loop, subtotal) declaration? Edit: I just figured it out, the devil is in the details, or in this case, in the names of the generic parameters of the body function System.Func<TSource, ParallelLoopState, TLocal, TLocal>. Thanks, I will mark yours as answer.
  • Honza Brestan
    Honza Brestan about 11 years
    Glad you found that out, seems my answers are sometimes still unclear.
  • Luis Ferrao
    Luis Ferrao about 11 years
    Up voting this because this really clarified things for me. Thanks.
  • StuartLC
    StuartLC over 6 years
    Remove the DB example and replace with a CPU -bound example. Using Parallel.For with databases probably isn't a good idea - async / await and parallelism through Task.WhenAll is preferable.