Best way to limit the number of active Tasks running via the Parallel Task Library

41,530

Solution 1

I just gave an answer which is very applicable to this question.

Basically, the TPL Task class is made to schedule CPU-bound work. It is not made for blocking work.

You are working with a resource that is not CPU: waiting for service replies. This means the TPL will mismange your resource because it assumes CPU boundedness to a certain degree.

Manage the resources yourself: Start a fixed number of threads or LongRunning tasks (which is basically the same). Decide on the number of threads empirically.

You can't put unreliable systems into production. For that reason, I recommend #1 but throttled. Don't create as many threads as there are work items. Create as many threads which are needed to saturate the remote service. Write yourself a helper function which spawns N threads and uses them to process M work items. You get totally predictable and reliable results that way.

Solution 2

Potential flow splits and continuations caused by await, later on in your code or in a 3rd party library, won't play nicely with long running tasks (or threads), so don't bother using long running tasks. In the async/await world, they're useless. More details here.

You can call ThreadPool.SetMaxThreads but before you make this call, make sure you set the minimum number of threads with ThreadPool.SetMinThreads, using values below or equal to the max ones. And by the way, the MSDN documentation is wrong. You CAN go below the number of cores on your machine with those method calls, at least in .NET 4.5 and 4.6 where I used this technique to reduce the processing power of a memory limited 32 bit service.

If however you don't wish to restrict the whole app but just the processing part of it, a custom task scheduler will do the job. A long time ago, MS released samples with several custom task schedulers, including a LimitedConcurrencyLevelTaskScheduler. Spawn the main processing task manually with Task.Factory.StartNew, providing the custom task scheduler, and every other task spawned by it will use it, including async/await and even Task.Yield, used for achieving asynchronousy early on in an async method.

But for your particular case, both solutions won't stop exhausting your queue of jobs before completing them. That might not be desirable, depending on the implementation and purpose of that queue of yours. They are more like "fire a bunch of tasks and let the scheduler find the time to execute them" type of solutions. So perhaps something a bit more appropriate here could be a stricter method of control over the execution of the jobs via semaphores. The code would look like this:

semaphore = new SemaphoreSlim(max_concurrent_jobs);

while(...){
 job = Queue.PopJob();
 semaphore.Wait();
 ProcessJobAsync(job);
}

async Task ProcessJobAsync(Job job){
 await Task.Yield();
 ... Process the job here...
 semaphore.Release();
}

There's more than one way to skin a cat. Use what you believe is appropriate.

Solution 3

Microsoft has a very cool library called DataFlow which does exactly what you want (and much more). Details here.

You should use the ActionBlock class and set the MaxDegreeOfParallelism of the ExecutionDataflowBlockOptions object. ActionBlock plays nicely with async/await, so even when your external calls are awaited, no new jobs will begin processing.

ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions
{
     MaxDegreeOfParallelism = 10
};

this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items),
            actionBlockOptions);
...
this.sendToAzureActionBlock.Post(itemsToProcess)

Solution 4

The problem here doesn't seem to be too many running Tasks, it's too many scheduled Tasks. Your code will try to schedule as many Tasks as it can, no matter how fast they are executed. And if you have too many jobs, this means you will get OOM.

Because of this, none of your proposed solutions will actually solve your problem. If it seems that simply specifying LongRunning solves your problem, then that's most likely because creating a new Thread (which is what LongRunning does) takes some time, which effectively throttles getting new jobs. So, this solution only works by accident, and will most likely lead to other problems later on.

Regarding the solution, I mostly agree with usr: the simplest solution that works reasonably well is to create a fixed number of LongRunning tasks and have one loop that calls Queue.PopJob() (protected by a lock if that method is not thread-safe) and Execute()s the job.

UPDATE: After some more thinking, I realized the following attempt will most likely behave terribly. Use it only if you're really sure it will work well for you.


But the TPL tries to figure out the best degree of parallelism, even for IO-bound Tasks. So, you might try to use that to your advantage. Long Tasks won't work here, because from the point of view of TPL, it seems like no work is done and it will start new Tasks over and over. What you can do instead is to start a new Task at the end of each Task. This way, TPL will know what's going on and its algorithm may work well. Also, to let the TPL decide the degree of parallelism, at the start of a Task that is first in its line, start another line of Tasks.

This algorithm may work well. But it's also possible that the TPL will make a bad decision regarding the degree of parallelism, I haven't actually tried anything like this.

In code, it would look like this:

void ProcessJobs(bool isFirst)
{
    var job = Queue.PopJob(); // assumes PopJob() is thread-safe
    if (job == null)
        return;

    if (isFirst)
        Task.Factory.StartNew(() => ProcessJobs(true));

    job.Execute();

    Task.Factory.StartNew(() => ProcessJob(false));
}

And start it with

Task.Factory.StartNew(() => ProcessJobs(true));

Solution 5

I use a message queue/mailbox mechanism to achieve this. It's akin to the actor model. I have a class that has a MailBox. I call this class my "worker." It can receive messages. Those messages are queued and they, essentially, define tasks that I want the worker to run. The worker will use Task.Wait() for its Task to finish before dequeueing the next message and starting the next task.

By limiting the number of workers I have, I am able to limit the number of concurrent threads/tasks that are being run.

This is outlined, with source code, in my blog post on a distributed compute engine. If you look at the code for IActor and the WorkerNode, I hope it makes sense.

https://long2know.com/2016/08/creating-a-distributed-computing-engine-with-the-actor-model-and-net-core/

Share:
41,530
Ryan
Author by

Ryan

Developer working primarily with SharePoint

Updated on September 09, 2020

Comments

  • Ryan
    Ryan over 3 years

    Consider a queue holding a lot of jobs that need processing. Limitation of queue is can only get 1 job at a time and no way of knowing how many jobs there are. The jobs take 10s to complete and involve a lot of waiting for responses from web services so is not CPU bound.

    If I use something like this

    while (true)
    {
       var job = Queue.PopJob();
       if (job == null)
          break;
       Task.Factory.StartNew(job.Execute); 
    }
    

    Then it will furiously pop jobs from the queue much faster than it can complete them, run out of memory and fall on its ass. >.<

    I can't use (I don't think) ParallelOptions.MaxDegreeOfParallelism because I can't use Parallel.Invoke or Parallel.ForEach

    3 alternatives I've found

    1. Replace Task.Factory.StartNew with

      Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
      task.Start();
      

      Which seems to somewhat solve the problem but I am not clear exactly what this is doing and if this is the best method.

    2. Create a custom task scheduler that limits the degree of concurrency

    3. Use something like BlockingCollection to add jobs to collection when started and remove when finished to limit number that can be running.

    With #1 I've got to trust that the right decision is automatically made, #2/#3 I've got to work out the max number of tasks that can be running myself.

    Have I understood this correctly - which is the better way, or is there another way?

    EDIT - This is what I've come up with from the answers below, producer-consumer pattern.

    As well as overall throughput aim was not to dequeue jobs faster than could be processed and not have multiple threads polling queue (not shown here but thats a non-blocking op and will lead to huge transaction costs if polled at high frequency from multiple places).

    // BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
    // point in being greedy!), or is empty on take.
    var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);
    
    // Setup a number of consumer threads.
    // Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
    // in job is blocked waiting IO then likely be 8.
    for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
    {
       Thread consumer = new Thread(() =>
       {
          while (!jobs.IsCompleted)
          {
             var job = jobs.Take();
             job.Execute();
          }
       }
       consumer.Start();
    }
    
    // Producer to take items of queue and put in blocking collection ready for processing
    while (true)
    {
        var job = Queue.PopJob();
        if (job != null)
           jobs.Add(job);
        else
        {
           jobs.CompletedAdding()
           // May need to wait for running jobs to finish
           break;
        }
    }
    
  • Ryan
    Ryan almost 12 years
    So to confirm you're saying use fixed threads or option #1 from orig Q?
  • svick
    svick almost 12 years
    @Ryan Option 1 is a really bad idea. It means creating thousands of threads, you should never do anything like that.
  • usr
    usr almost 12 years
    @Ryan, I recommend #1 but throttled Don't create as many threads as there are work items. Create as many threads which are needed to saturate the remote service. Write yourself a helper function which spawns N threads and uses them to process M work items. You get totally predictable and reliable results that way.
  • usr
    usr almost 12 years
    Many of your points are correct. Just one thing from my experience: The TPL will vastly mismanage the amount of threads. In both directions. This is too unreliable to put into production.
  • Ryan
    Ryan almost 12 years
    Great point about LongRunning only accidentally solving the problem - it was why I was wary about it as I dind't really understand what was going on under he hood. Makes perfect sense now and it does indeed behave exactly how you described - just created 1500 new threads on a dual core machine in my test!
  • Ryan
    Ryan almost 12 years
    However - your ProcessJobs example makes me very nervous as well, who can know how this would behave in different circumstances.
  • Ryan
    Ryan almost 12 years
    @usr - So throttling the amount of threads 'active' at any one time using a method like #3 (BlockingCollection) or did you have something else in mind?
  • usr
    usr almost 12 years
    @Ryan, not sure how that would work exactly. You don't want only one thread working on the work items. You also don't want one thread starting tasks as fast as it can because that triggers said TPL problems. Every solution where you don't ensure that a fixed number of threads is running is not correct, in my opinion.
  • svick
    svick almost 12 years
    @usr @Ryan Well, I like the idea behind ProcessJobs(): let the TPL figure out the best degree of parallelism based on current empirical data. But yeah, before deploying something like that in production, I would certainly test it a lot. And it's very well possible it won't behave well.
  • Ryan
    Ryan almost 12 years
    Thats not whats happening in my tests - its starting hundreds of threads (dual core cpu) on a Task with a lot of waiting for I/O and even if I make the Tasks 100% CPU bound (prime number calc busywork with no sleep or blocking) its still starting 14 threads at once which isn't very efficient.
  • Maciej
    Maciej almost 12 years
    Because your own judgement is that 14 is too much, I don't think there is a way to relay the decision on how many threads is enough to any automation. You should manage it yourself. Spawn Tasks in a loop but use a semaphore to limit number of tasks to whatever you see appropriate.
  • Zapnologica
    Zapnologica almost 9 years
    Has anyone had any success with this solution?
  • NStuke
    NStuke about 8 years
    If you are creating a thread per executing tasks, but the tasks are spending a lot of time waiting on network responses, aren't you giving away the work-stealing benefits of the TPL library overall since essentially the thread will be spending a lot of time waiting on I/O completion and be idle? Would you lose a lot of the benefits of using async / await and are just as good resorting back to the blocking network calls in .NET?
  • usr
    usr about 8 years
    @NStuke his work is not CPU bound. Any optimization of CPU consumption is moot, it helps him nothing. If your CPUs are not pegged a lot of the time there will not be work stealing anyway since the pool queues are almost always empty. Async IO usually consumes more CPU than sync IO (hard to believe but true). In the OPs case this is all about optimizing IO patterns. Whether the IOs are initiated sync or async does not matter at all for their performance.
  • Isaac Paul
    Isaac Paul almost 8 years
    There is a bug in this code that will create an infinite amount of tasks in the first 2 lines.
  • svick
    svick almost 8 years
    @IsaacPaul You're right, thanks for noticing that, fixed.
  • JJS
    JJS almost 7 years
    LimitedConcurrencyLevelTaskScheduler is gold. There is a nuget package for that now too nuget.org/packages/ParallelExtensionsExtras
  • Ben Stabile
    Ben Stabile over 6 years
    This is the correct choice in my opinion. Not only can you easily control the parallelism, but you can chain actions performed AFTER tasks are completed (passing results). Advanced patterns such as "Producer-Consumer" are very easy to implement. Per the original problem, the calls out to web services can pass back results that will be processed downstream as they become available. You can easily THROTTLE that work as well.
  • Ben Stabile
    Ben Stabile over 6 years
    BTW: You can use MaxDegreeOfParallelism and await calls to SendAsync(x) instead of Post(x) on the Block that is executing the lambdas (actions or functions operating on the data or objects passed in). On the downstream end you can ReceiveAsync() which returns Task<T>, and then add a ContinueWith(Action<Task<T>>).
  • GrumpyRodriguez
    GrumpyRodriguez almost 4 years
    Does this answer imply that async/await should not be used for IO bound tasks then? Async controllers in asp.net (core), and lots of other documentation suggest async/await as a means of efficiently handling blocking IO. Your answer hints that this'll lead to a mismanaged thread pool but this is the only suggestion I've seen against use of TPL (async/await) with IO bound calls. Can you please clarify exactly what problems mismanagement will create? ps: see MS's own documentation re IO and TPL: docs.microsoft.com/en-us/dotnet/csharp/async
  • usr
    usr almost 4 years
    Hm, no... Async IO can have throughput benefits. Async IO does not use threads, so the TPL thread scheduling issue does not really come into play. My answer assumes blocking/sync IO. Avoiding thread overuse is a key reason to use async. In practical applications this plays no significant role most of the time. I disagree with the guidance to use async IO by default. It has productivity and bug count downsides. It can be very beneficial in certain circumstances.