Best way to limit the number of active Tasks running via the Parallel Task Library
Solution 1
I just gave an answer which is very applicable to this question.
Basically, the TPL Task class is made to schedule CPU-bound work. It is not made for blocking work.
You are working with a resource that is not CPU: waiting for service replies. This means the TPL will mismange your resource because it assumes CPU boundedness to a certain degree.
Manage the resources yourself: Start a fixed number of threads or LongRunning tasks (which is basically the same). Decide on the number of threads empirically.
You can't put unreliable systems into production. For that reason, I recommend #1 but throttled. Don't create as many threads as there are work items. Create as many threads which are needed to saturate the remote service. Write yourself a helper function which spawns N threads and uses them to process M work items. You get totally predictable and reliable results that way.
Solution 2
Potential flow splits and continuations caused by await
, later on in your code or in a 3rd party library, won't play nicely with long running tasks (or threads), so don't bother using long running tasks. In the async/await
world, they're useless. More details here.
You can call ThreadPool.SetMaxThreads
but before you make this call, make sure you set the minimum number of threads with ThreadPool.SetMinThreads
, using values below or equal to the max ones. And by the way, the MSDN documentation is wrong. You CAN go below the number of cores on your machine with those method calls, at least in .NET 4.5 and 4.6 where I used this technique to reduce the processing power of a memory limited 32 bit service.
If however you don't wish to restrict the whole app but just the processing part of it, a custom task scheduler will do the job. A long time ago, MS released samples with several custom task schedulers, including a LimitedConcurrencyLevelTaskScheduler
. Spawn the main processing task manually with Task.Factory.StartNew
, providing the custom task scheduler, and every other task spawned by it will use it, including async/await
and even Task.Yield
, used for achieving asynchronousy early on in an async
method.
But for your particular case, both solutions won't stop exhausting your queue of jobs before completing them. That might not be desirable, depending on the implementation and purpose of that queue of yours. They are more like "fire a bunch of tasks and let the scheduler find the time to execute them" type of solutions. So perhaps something a bit more appropriate here could be a stricter method of control over the execution of the jobs via semaphores
. The code would look like this:
semaphore = new SemaphoreSlim(max_concurrent_jobs);
while(...){
job = Queue.PopJob();
semaphore.Wait();
ProcessJobAsync(job);
}
async Task ProcessJobAsync(Job job){
await Task.Yield();
... Process the job here...
semaphore.Release();
}
There's more than one way to skin a cat. Use what you believe is appropriate.
Solution 3
Microsoft has a very cool library called DataFlow which does exactly what you want (and much more). Details here.
You should use the ActionBlock class and set the MaxDegreeOfParallelism of the ExecutionDataflowBlockOptions object. ActionBlock plays nicely with async/await, so even when your external calls are awaited, no new jobs will begin processing.
ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items),
actionBlockOptions);
...
this.sendToAzureActionBlock.Post(itemsToProcess)
Solution 4
The problem here doesn't seem to be too many running Task
s, it's too many scheduled Task
s. Your code will try to schedule as many Task
s as it can, no matter how fast they are executed. And if you have too many jobs, this means you will get OOM.
Because of this, none of your proposed solutions will actually solve your problem. If it seems that simply specifying LongRunning
solves your problem, then that's most likely because creating a new Thread
(which is what LongRunning
does) takes some time, which effectively throttles getting new jobs. So, this solution only works by accident, and will most likely lead to other problems later on.
Regarding the solution, I mostly agree with usr: the simplest solution that works reasonably well is to create a fixed number of LongRunning
tasks and have one loop that calls Queue.PopJob()
(protected by a lock
if that method is not thread-safe) and Execute()
s the job.
UPDATE: After some more thinking, I realized the following attempt will most likely behave terribly. Use it only if you're really sure it will work well for you.
But the TPL tries to figure out the best degree of parallelism, even for IO-bound Task
s. So, you might try to use that to your advantage. Long Task
s won't work here, because from the point of view of TPL, it seems like no work is done and it will start new Task
s over and over. What you can do instead is to start a new Task
at the end of each Task
. This way, TPL will know what's going on and its algorithm may work well. Also, to let the TPL decide the degree of parallelism, at the start of a Task
that is first in its line, start another line of Task
s.
This algorithm may work well. But it's also possible that the TPL will make a bad decision regarding the degree of parallelism, I haven't actually tried anything like this.
In code, it would look like this:
void ProcessJobs(bool isFirst)
{
var job = Queue.PopJob(); // assumes PopJob() is thread-safe
if (job == null)
return;
if (isFirst)
Task.Factory.StartNew(() => ProcessJobs(true));
job.Execute();
Task.Factory.StartNew(() => ProcessJob(false));
}
And start it with
Task.Factory.StartNew(() => ProcessJobs(true));
Solution 5
I use a message queue/mailbox mechanism to achieve this. It's akin to the actor model. I have a class that has a MailBox. I call this class my "worker." It can receive messages. Those messages are queued and they, essentially, define tasks that I want the worker to run. The worker will use Task.Wait() for its Task to finish before dequeueing the next message and starting the next task.
By limiting the number of workers I have, I am able to limit the number of concurrent threads/tasks that are being run.
This is outlined, with source code, in my blog post on a distributed compute engine. If you look at the code for IActor and the WorkerNode, I hope it makes sense.
Comments
-
Ryan over 3 years
Consider a queue holding a lot of jobs that need processing. Limitation of queue is can only get 1 job at a time and no way of knowing how many jobs there are. The jobs take 10s to complete and involve a lot of waiting for responses from web services so is not CPU bound.
If I use something like this
while (true) { var job = Queue.PopJob(); if (job == null) break; Task.Factory.StartNew(job.Execute); }
Then it will furiously pop jobs from the queue much faster than it can complete them, run out of memory and fall on its ass. >.<
I can't use (I don't think) ParallelOptions.MaxDegreeOfParallelism because I can't use Parallel.Invoke or Parallel.ForEach
3 alternatives I've found
-
Replace Task.Factory.StartNew with
Task task = new Task(job.Execute,TaskCreationOptions.LongRunning) task.Start();
Which seems to somewhat solve the problem but I am not clear exactly what this is doing and if this is the best method.
Create a custom task scheduler that limits the degree of concurrency
Use something like BlockingCollection to add jobs to collection when started and remove when finished to limit number that can be running.
With #1 I've got to trust that the right decision is automatically made, #2/#3 I've got to work out the max number of tasks that can be running myself.
Have I understood this correctly - which is the better way, or is there another way?
EDIT - This is what I've come up with from the answers below, producer-consumer pattern.
As well as overall throughput aim was not to dequeue jobs faster than could be processed and not have multiple threads polling queue (not shown here but thats a non-blocking op and will lead to huge transaction costs if polled at high frequency from multiple places).
// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no // point in being greedy!), or is empty on take. var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1); // Setup a number of consumer threads. // Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time // in job is blocked waiting IO then likely be 8. for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++) { Thread consumer = new Thread(() => { while (!jobs.IsCompleted) { var job = jobs.Take(); job.Execute(); } } consumer.Start(); } // Producer to take items of queue and put in blocking collection ready for processing while (true) { var job = Queue.PopJob(); if (job != null) jobs.Add(job); else { jobs.CompletedAdding() // May need to wait for running jobs to finish break; } }
-
-
Ryan almost 12 yearsSo to confirm you're saying use fixed threads or option #1 from orig Q?
-
svick almost 12 years@Ryan Option 1 is a really bad idea. It means creating thousands of threads, you should never do anything like that.
-
usr almost 12 years@Ryan, I recommend #1 but throttled Don't create as many threads as there are work items. Create as many threads which are needed to saturate the remote service. Write yourself a helper function which spawns N threads and uses them to process M work items. You get totally predictable and reliable results that way.
-
usr almost 12 yearsMany of your points are correct. Just one thing from my experience: The TPL will vastly mismanage the amount of threads. In both directions. This is too unreliable to put into production.
-
Ryan almost 12 yearsGreat point about LongRunning only accidentally solving the problem - it was why I was wary about it as I dind't really understand what was going on under he hood. Makes perfect sense now and it does indeed behave exactly how you described - just created 1500 new threads on a dual core machine in my test!
-
Ryan almost 12 yearsHowever - your ProcessJobs example makes me very nervous as well, who can know how this would behave in different circumstances.
-
Ryan almost 12 years@usr - So throttling the amount of threads 'active' at any one time using a method like #3 (BlockingCollection) or did you have something else in mind?
-
usr almost 12 years@Ryan, not sure how that would work exactly. You don't want only one thread working on the work items. You also don't want one thread starting tasks as fast as it can because that triggers said TPL problems. Every solution where you don't ensure that a fixed number of threads is running is not correct, in my opinion.
-
svick almost 12 years@usr @Ryan Well, I like the idea behind
ProcessJobs()
: let the TPL figure out the best degree of parallelism based on current empirical data. But yeah, before deploying something like that in production, I would certainly test it a lot. And it's very well possible it won't behave well. -
Ryan almost 12 yearsThats not whats happening in my tests - its starting hundreds of threads (dual core cpu) on a Task with a lot of waiting for I/O and even if I make the Tasks 100% CPU bound (prime number calc busywork with no sleep or blocking) its still starting 14 threads at once which isn't very efficient.
-
Maciej almost 12 yearsBecause your own judgement is that 14 is too much, I don't think there is a way to relay the decision on how many threads is enough to any automation. You should manage it yourself. Spawn Tasks in a loop but use a semaphore to limit number of tasks to whatever you see appropriate.
-
Zapnologica almost 9 yearsHas anyone had any success with this solution?
-
NStuke about 8 yearsIf you are creating a thread per executing tasks, but the tasks are spending a lot of time waiting on network responses, aren't you giving away the work-stealing benefits of the TPL library overall since essentially the thread will be spending a lot of time waiting on I/O completion and be idle? Would you lose a lot of the benefits of using async / await and are just as good resorting back to the blocking network calls in .NET?
-
usr about 8 years@NStuke his work is not CPU bound. Any optimization of CPU consumption is moot, it helps him nothing. If your CPUs are not pegged a lot of the time there will not be work stealing anyway since the pool queues are almost always empty. Async IO usually consumes more CPU than sync IO (hard to believe but true). In the OPs case this is all about optimizing IO patterns. Whether the IOs are initiated sync or async does not matter at all for their performance.
-
Isaac Paul almost 8 yearsThere is a bug in this code that will create an infinite amount of tasks in the first 2 lines.
-
svick almost 8 years@IsaacPaul You're right, thanks for noticing that, fixed.
-
JJS almost 7 yearsLimitedConcurrencyLevelTaskScheduler is gold. There is a nuget package for that now too nuget.org/packages/ParallelExtensionsExtras
-
Ben Stabile over 6 yearsThis is the correct choice in my opinion. Not only can you easily control the parallelism, but you can chain actions performed AFTER tasks are completed (passing results). Advanced patterns such as "Producer-Consumer" are very easy to implement. Per the original problem, the calls out to web services can pass back results that will be processed downstream as they become available. You can easily THROTTLE that work as well.
-
Ben Stabile over 6 yearsBTW: You can use MaxDegreeOfParallelism and await calls to SendAsync(x) instead of Post(x) on the Block that is executing the lambdas (actions or functions operating on the data or objects passed in). On the downstream end you can ReceiveAsync() which returns Task<T>, and then add a ContinueWith(Action<Task<T>>).
-
GrumpyRodriguez almost 4 yearsDoes this answer imply that async/await should not be used for IO bound tasks then? Async controllers in asp.net (core), and lots of other documentation suggest async/await as a means of efficiently handling blocking IO. Your answer hints that this'll lead to a mismanaged thread pool but this is the only suggestion I've seen against use of TPL (async/await) with IO bound calls. Can you please clarify exactly what problems mismanagement will create? ps: see MS's own documentation re IO and TPL: docs.microsoft.com/en-us/dotnet/csharp/async
-
usr almost 4 yearsHm, no... Async IO can have throughput benefits. Async IO does not use threads, so the TPL thread scheduling issue does not really come into play. My answer assumes blocking/sync IO. Avoiding thread overuse is a key reason to use async. In practical applications this plays no significant role most of the time. I disagree with the guidance to use async IO by default. It has productivity and bug count downsides. It can be very beneficial in certain circumstances.