C# Parallel.foreach - Making variables thread safe
Solution 1
Writing to the file is expensive, you're holding a exclusive lock while writing to the file, that's bad. It's going to introduce contention.
You could add it in a buffer, then write to the file all at once. That should remove contention and provide way to scale.
if (datatable_results.Rows.Count > 0)
{
ConcurrentQueue<string> buffer = new ConcurrentQueue<string>();
Parallel.ForEach(datatable_results.AsEnumerable(), (data_row, state, index) =>
{
// Process data_row as normal.
// When ready to write to log, do so.
buffer.Enqueue(string.Format( "Processing row: {0}", index));
});
streamwriter.AutoFlush = false;
string line;
while (buffer.TryDequeue(out line))
{
streamwriter.WriteLine(line);
}
streamwriter.Flush();//Flush once when needed
}
- Note that you don't need to maintain a loop counter,
Parallel.ForEach
provides you one. Difference is that it is not the counter but index. If I've changed the expected behavior you can still add the counter back and useInterlocked.Increment
to increment it. - I see that you're using
streamwriter.AutoFlush = true
, that will hurt performance, you can set it tofalse
and flush it once you're done writing all the data.
If possible, wrap the StreamWriter
in using statement, so that you don't even need to flush the stream(you get it for free).
Alternatively, you could look at the logging frameworks which does their job pretty well. Example: NLog, Log4net etc.
Solution 2
First of all, it takes about 2 seconds to process a row in your table and perhaps a few milliseconds to increment the counter and write to the log file. With the actual processing being 1000x more than the part you need to serialize, the method doesn't matter too much.
Furthermore, the way you have implemented it is perfectly solid. There are ways to optimize it, but none that are worth implementing in your situation.
One useful way to avoid locking on the increment is to use Interlocked.Increment. It is a bit slower than x++
but much faster than lock {x++;}
. In your case, though, it doesn't matter.
As for the file output, remember that the output is going to be serialized anyway, so at best you can minimize the amount of time spent in the lock. You can do this by buffering all of your output before entering the lock, then just perform the write operation inside the lock. You probably want to do async writes to avoid unnecessary blocking on I/O.
Solution 3
You may try to improve this, if you avoid logging, or log into only thread specific log file (not sure if that makes sense to you)
TPL
start as many threads as many cores you have Does Parallel.ForEach limits the number of active threads?.
So what you can do is:
1) Get numbers of core on target machine
2) Create a list of counters, with as many elements inside as many cores you have
3) Update counter for every core
4) Sum all them up after parallel execution terminates.
So, in practice :
//KEY(THREAD ID, VALUE: THREAD LOCAL COUNTER)
Dictionary<int,int> counters = new Dictionary<int, int>(NUMBER_OF_CORES);
....
Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
{
// Process data_row as normal.
// When ready to write to log, do so.
//lock (locker) //NO NEED FOR LOCK, EVERY THREAD UPDATES ITS _OWN_ COUNTER
//{
//row_counter++;
counters[Thread.CurrentThread.ManagedThreadId].Value +=1;
//NO WRITING< OR WRITING THREAD SPECIFIC FILE ONLY
//streamwriter.WriteLine("Processing row: {0}", row_counter);
//}
});
....
//AFTER EXECUTION OF PARALLEL LOOP SUM ALL COUNTERS AND GET TOTAL OF ALL THREADS.
The benefit of this that no locking envolved at all, which will drammatically improve performance. When you use .net concurent collections, they are always use some kind of locking inside.
This is naturally a basic idea, may not work as it expected if you copy paste. We are talking about multi threading , which is always a hard topic. But, hopefully, it provides to you some ideas to relay on.
Comments
-
dalemac almost 2 years
I have been rewriting some process intensive looping to use TPL to increase speed. This is the first time I have tried threading, so want to check what I am doing is the correct way to do it.
The results are good - processing the data from 1000 Rows in a
DataTable
has reduced processing time from 34 minutes to 9 minutes when moving from a standardforeach
loop into aParallel.ForEach
loop. For this test, I removed non thread safe operations, such as writing data to a log file and incrementing a counter.I still need to write back into a log file and increment a counter, so i tried implementing a lock which encases the streamwriter/increment code block.
FileStream filestream = new FileStream("path_to_file.txt", FileMode.Create); StreamWriter streamwriter = new StreamWriter(filestream); streamwriter.AutoFlush = true; try { object locker = new object(); // Lets assume we have a DataTable containing 1000 rows of data. DataTable datatable_results; if (datatable_results.Rows.Count > 0) { int row_counter = 0; Parallel.ForEach(datatable_results.AsEnumerable(), data_row => { // Process data_row as normal. // When ready to write to log, do so. lock (locker) { row_counter++; streamwriter.WriteLine("Processing row: {0}", row_counter); // Write any data we want to log. } }); } } catch (Exception e) { // Catch the exception. } streamwriter.Close();
The above seems to work as expected, with minimal performance costs (still 9 minutes execution time). Granted, the actions contained in the lock are hardly significant themselves - I assume that as the time taken to process code within the lock increases, the longer the thread is locked for, the more it affects processing time.
My question: is the above an efficient way of doing this or is there a different way of achieving the above that is either faster or safer?
Also, lets say our original
DataTable
actually contains 30000 rows. Is there anything to be gained by splitting thisDataTable
into chunks of 1000 rows each and then processing them in theParallel.ForEach
, instead of processing all 300000 rows in one go?-
Yuval Itzchakov over 9 yearsThis question appears to be off-topic because it belongs on Code Review
-
dalemac over 9 yearsThe above code will not compile or run - it's not a direct extract. I'm more concerned with the theory of multithreading and making variables thread safe. Thanks though.
-
-
RobH over 9 yearsBut this makes the logging really weird. You don't log as you go, you log at the end. So if something goes wrong you don't get any info on where you got to in the logs.
-
Sriram Sakthivel over 9 years@RobH I don't see what you mean, Why not wrap the logging part in finally block if you worry about it?
-
RobH over 9 yearsAt the moment, you could look at the logs while the operation was in progress and see it doing something. Now you have to wait for it to finish then read the log all at once. You also have to hold all of the logging info in memory which might not be scalable.
-
Sriram Sakthivel over 9 years@RobH Is that a requirement to look at logs while process is in flight? Holding
30000
elements of small string in memory is not too much. -
RobH over 9 yearsIf something is going on for 10 minutes and I have absolutely no idea what the hell it's doing and no way of finding out what it's doing, I'm going to be pretty terrified.
-
Sriram Sakthivel over 9 years@RobH I'm pretty sure file access is the main bottle neck. Check OP used auto flush also. For each iteration of the loop, Disk has to spin, content has to be written in disk, meanwhile all other threads has to wait, and so forth. Imaging this cycle goes on for 30000 times. That should answer your question.
-
RobH over 9 yearsI'm not suggesting keeping in the lock and streamwriter. I'm only saying that your code makes the logging odd in my opinion.
-
Tigran over 9 years@RobH: if there are no huge numbers envolved in your calculation, logging while paralelizing, bascially, drammatically decrise performance. Basic rule is: run as fast as you can, and check result after. Naturally if calculation takes time hours/days, it's not a solution and you need to schedule some kind of logging. But pretty sure, if you avoid logging during calculus, and avoid lock in general (look on hints in my answer) you will eventually boost performance even more, so wait for result even less. Or leave logging only for debugging.
-
RobH over 9 years@Tigran - Yes, I understand the factors at play here. I would create buffered logging with a largish buffer or use a library that supports it. There's no reason to sacrifice knowing something about what your application is doing.
-
Sriram Sakthivel over 9 years@RobH Your comment is rather amusing. So, you wanted to slow down the actual code just for the sake of logging? Logging is a cross cutting concern that shouldn't affect your main processing.
-
RobH over 9 yearsI am absolutely not suggesting locking around a file write after every row is processed.
-
dalemac over 9 yearsAlas, as my first attempt at threading, a thread specific log file doesn't make sense to me. Would you mind elaborating?
-
Tigran over 9 years@RobH: hps is always about balance. if 9 minutes is ok for you, keep logging and feels safe you and your client, having an information "at runtime" (wonder if you can do something when you get an error "at runtime", if not what is a point?). Our point was if you are going to paralelize, cause you concern about performance, push a little bit hurder and go a lot further.
-
dalemac over 9 yearsThanks, this answer and subsequent comments have given me a lot to things to consider.
-
RobH over 9 years@Tigran - agreed. I was looking at the problem from the point of view where processing the data is very much more expensive than the logging (not true in OPs code). Your code is interesting but note that
Parallel.ForEach
can run multiple partitions on the same thread so it might not be safe to use the current thread id as the key. -
Tigran over 9 yearsI mean, if you need to log something in real time, if you avoid to write into the same file from different threads, you can avoid threading sync issue. What I was suggesting is to avoid real time logging at all, and just have final dump file about results done and calculations made. You can, at runtime, assemble strings with log data, but write those string into the file, only after termination of parallel loop. This is naturally, if the calculation does not involve long wait period.
-
dalemac over 9 yearsAfter thinking about this for a while, The logs would only really be looked at if there was a problem with processed data highlighted (or an exception was thrown). They wouldn't usually be looked at while the process was still running (unless debugging, which can be split into a separate method anyway). I can see how logging is going to cause contention and the
ConcurrentQueue
data type looks interesting so I will give that a go, as well as removing the counter iteration. Log files are typically 60mb - nothing in terms of memory allocation to hold onto. Thanks all!