C# Parallel.foreach - Making variables thread safe

16,871

Solution 1

Writing to the file is expensive, you're holding a exclusive lock while writing to the file, that's bad. It's going to introduce contention.

You could add it in a buffer, then write to the file all at once. That should remove contention and provide way to scale.

if (datatable_results.Rows.Count > 0)
{
    ConcurrentQueue<string> buffer = new ConcurrentQueue<string>();
    Parallel.ForEach(datatable_results.AsEnumerable(), (data_row, state, index) =>
    {
        // Process data_row as normal.

        // When ready to write to log, do so.

        buffer.Enqueue(string.Format( "Processing row: {0}", index));
    });

    streamwriter.AutoFlush = false;
    string line;
    while (buffer.TryDequeue(out line))
    {
        streamwriter.WriteLine(line);
    }
    streamwriter.Flush();//Flush once when needed
}
  1. Note that you don't need to maintain a loop counter, Parallel.ForEach provides you one. Difference is that it is not the counter but index. If I've changed the expected behavior you can still add the counter back and use Interlocked.Increment to increment it.
  2. I see that you're using streamwriter.AutoFlush = true, that will hurt performance, you can set it to false and flush it once you're done writing all the data.

If possible, wrap the StreamWriter in using statement, so that you don't even need to flush the stream(you get it for free).

Alternatively, you could look at the logging frameworks which does their job pretty well. Example: NLog, Log4net etc.

Solution 2

First of all, it takes about 2 seconds to process a row in your table and perhaps a few milliseconds to increment the counter and write to the log file. With the actual processing being 1000x more than the part you need to serialize, the method doesn't matter too much.

Furthermore, the way you have implemented it is perfectly solid. There are ways to optimize it, but none that are worth implementing in your situation.

One useful way to avoid locking on the increment is to use Interlocked.Increment. It is a bit slower than x++ but much faster than lock {x++;}. In your case, though, it doesn't matter.

As for the file output, remember that the output is going to be serialized anyway, so at best you can minimize the amount of time spent in the lock. You can do this by buffering all of your output before entering the lock, then just perform the write operation inside the lock. You probably want to do async writes to avoid unnecessary blocking on I/O.

Solution 3

You may try to improve this, if you avoid logging, or log into only thread specific log file (not sure if that makes sense to you)

TPL start as many threads as many cores you have Does Parallel.ForEach limits the number of active threads?.

So what you can do is:

1) Get numbers of core on target machine

2) Create a list of counters, with as many elements inside as many cores you have

3) Update counter for every core

4) Sum all them up after parallel execution terminates.

So, in practice :

//KEY(THREAD ID, VALUE: THREAD LOCAL COUNTER)
Dictionary<int,int> counters = new Dictionary<int, int>(NUMBER_OF_CORES); 
....
 Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
        { 
            // Process data_row as normal.

            // When ready to write to log, do so.
            //lock (locker) //NO NEED FOR LOCK, EVERY THREAD UPDATES ITS _OWN_ COUNTER
            //{
                //row_counter++;

                 counters[Thread.CurrentThread.ManagedThreadId].Value +=1;

                //NO WRITING< OR WRITING THREAD SPECIFIC FILE ONLY
                //streamwriter.WriteLine("Processing row: {0}", row_counter);


            //}
        });             
....

//AFTER EXECUTION OF PARALLEL LOOP SUM ALL COUNTERS AND GET TOTAL OF ALL THREADS. 

The benefit of this that no locking envolved at all, which will drammatically improve performance. When you use .net concurent collections, they are always use some kind of locking inside.

This is naturally a basic idea, may not work as it expected if you copy paste. We are talking about multi threading , which is always a hard topic. But, hopefully, it provides to you some ideas to relay on.

Share:
16,871
dalemac
Author by

dalemac

Software Engineer at the Universit of Lincoln.

Updated on June 04, 2022

Comments

  • dalemac
    dalemac almost 2 years

    I have been rewriting some process intensive looping to use TPL to increase speed. This is the first time I have tried threading, so want to check what I am doing is the correct way to do it.

    The results are good - processing the data from 1000 Rows in a DataTable has reduced processing time from 34 minutes to 9 minutes when moving from a standard foreach loop into a Parallel.ForEach loop. For this test, I removed non thread safe operations, such as writing data to a log file and incrementing a counter.

    I still need to write back into a log file and increment a counter, so i tried implementing a lock which encases the streamwriter/increment code block.

    FileStream filestream = new FileStream("path_to_file.txt", FileMode.Create);
    StreamWriter streamwriter = new StreamWriter(filestream);
    streamwriter.AutoFlush = true;
    
    try
    {
        object locker = new object();
    
        // Lets assume we have a DataTable containing 1000 rows of data.
        DataTable datatable_results;
    
        if (datatable_results.Rows.Count > 0)
        {
            int row_counter = 0;
    
            Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
            { 
                // Process data_row as normal.
    
                // When ready to write to log, do so.
                lock (locker)
                {
                    row_counter++;
                    streamwriter.WriteLine("Processing row: {0}", row_counter);
    
                    // Write any data we want to log.
                }
            });                    
        }
    }
    catch (Exception e)
    {
        // Catch the exception.
    }
    streamwriter.Close();
    

    The above seems to work as expected, with minimal performance costs (still 9 minutes execution time). Granted, the actions contained in the lock are hardly significant themselves - I assume that as the time taken to process code within the lock increases, the longer the thread is locked for, the more it affects processing time.

    My question: is the above an efficient way of doing this or is there a different way of achieving the above that is either faster or safer?

    Also, lets say our original DataTable actually contains 30000 rows. Is there anything to be gained by splitting this DataTable into chunks of 1000 rows each and then processing them in the Parallel.ForEach, instead of processing all 300000 rows in one go?

    • Yuval Itzchakov
      Yuval Itzchakov over 9 years
      This question appears to be off-topic because it belongs on Code Review
    • dalemac
      dalemac over 9 years
      The above code will not compile or run - it's not a direct extract. I'm more concerned with the theory of multithreading and making variables thread safe. Thanks though.
  • RobH
    RobH over 9 years
    But this makes the logging really weird. You don't log as you go, you log at the end. So if something goes wrong you don't get any info on where you got to in the logs.
  • Sriram Sakthivel
    Sriram Sakthivel over 9 years
    @RobH I don't see what you mean, Why not wrap the logging part in finally block if you worry about it?
  • RobH
    RobH over 9 years
    At the moment, you could look at the logs while the operation was in progress and see it doing something. Now you have to wait for it to finish then read the log all at once. You also have to hold all of the logging info in memory which might not be scalable.
  • Sriram Sakthivel
    Sriram Sakthivel over 9 years
    @RobH Is that a requirement to look at logs while process is in flight? Holding 30000 elements of small string in memory is not too much.
  • RobH
    RobH over 9 years
    If something is going on for 10 minutes and I have absolutely no idea what the hell it's doing and no way of finding out what it's doing, I'm going to be pretty terrified.
  • Sriram Sakthivel
    Sriram Sakthivel over 9 years
    @RobH I'm pretty sure file access is the main bottle neck. Check OP used auto flush also. For each iteration of the loop, Disk has to spin, content has to be written in disk, meanwhile all other threads has to wait, and so forth. Imaging this cycle goes on for 30000 times. That should answer your question.
  • RobH
    RobH over 9 years
    I'm not suggesting keeping in the lock and streamwriter. I'm only saying that your code makes the logging odd in my opinion.
  • Tigran
    Tigran over 9 years
    @RobH: if there are no huge numbers envolved in your calculation, logging while paralelizing, bascially, drammatically decrise performance. Basic rule is: run as fast as you can, and check result after. Naturally if calculation takes time hours/days, it's not a solution and you need to schedule some kind of logging. But pretty sure, if you avoid logging during calculus, and avoid lock in general (look on hints in my answer) you will eventually boost performance even more, so wait for result even less. Or leave logging only for debugging.
  • RobH
    RobH over 9 years
    @Tigran - Yes, I understand the factors at play here. I would create buffered logging with a largish buffer or use a library that supports it. There's no reason to sacrifice knowing something about what your application is doing.
  • Sriram Sakthivel
    Sriram Sakthivel over 9 years
    @RobH Your comment is rather amusing. So, you wanted to slow down the actual code just for the sake of logging? Logging is a cross cutting concern that shouldn't affect your main processing.
  • RobH
    RobH over 9 years
    I am absolutely not suggesting locking around a file write after every row is processed.
  • dalemac
    dalemac over 9 years
    Alas, as my first attempt at threading, a thread specific log file doesn't make sense to me. Would you mind elaborating?
  • Tigran
    Tigran over 9 years
    @RobH: hps is always about balance. if 9 minutes is ok for you, keep logging and feels safe you and your client, having an information "at runtime" (wonder if you can do something when you get an error "at runtime", if not what is a point?). Our point was if you are going to paralelize, cause you concern about performance, push a little bit hurder and go a lot further.
  • dalemac
    dalemac over 9 years
    Thanks, this answer and subsequent comments have given me a lot to things to consider.
  • RobH
    RobH over 9 years
    @Tigran - agreed. I was looking at the problem from the point of view where processing the data is very much more expensive than the logging (not true in OPs code). Your code is interesting but note that Parallel.ForEach can run multiple partitions on the same thread so it might not be safe to use the current thread id as the key.
  • Tigran
    Tigran over 9 years
    I mean, if you need to log something in real time, if you avoid to write into the same file from different threads, you can avoid threading sync issue. What I was suggesting is to avoid real time logging at all, and just have final dump file about results done and calculations made. You can, at runtime, assemble strings with log data, but write those string into the file, only after termination of parallel loop. This is naturally, if the calculation does not involve long wait period.
  • dalemac
    dalemac over 9 years
    After thinking about this for a while, The logs would only really be looked at if there was a problem with processed data highlighted (or an exception was thrown). They wouldn't usually be looked at while the process was still running (unless debugging, which can be split into a separate method anyway). I can see how logging is going to cause contention and the ConcurrentQueue data type looks interesting so I will give that a go, as well as removing the counter iteration. Log files are typically 60mb - nothing in terms of memory allocation to hold onto. Thanks all!