C# Task thread pool - Running 100 tasks across only 10 threads

22,306

Solution 1

I think you're missing the point by focusing on threads, especially for asynchronous operations that don't require threads to execute.

.NET has a great ThreadPool you can use. You don't know how many threads are in it and you don't care. It just works (until it doesn't and you need to configure it yourself, but that's very advance).

Running tasks on the ThreadPool is very simple. Either create a task for each operation and throttle them using a SemaphoreSlim or use the ready-made TPL Dataflow blocks. For example:

var block = new ActionBlock<SomeData>(
    _ => _repository.WriteDataAsync(_), // What to do on each item
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); // How many items at the same time

foreach (var item in items)
{
    block.Post(item); // Post all items to the block
}

block.Complete(); // Signal completion
await block.Completion; // Asynchronously wait for completion.

If, however, you do plan on creating "dedicated" threads you can use Task.Factory.StartNew with the LongRunning option that creates a dedicated thread outside the ThreadPool. But keep in mind that async operations don't maintain the same thread throughout the operation as async operations don't need a thread. So starting on a dedicated thread may be pointless (more on that on my blog: LongRunning Is Useless For Task.Run With async-await)

Solution 2

@i3arnon's answer is correct. Use TPL Dataflow.

The rest of this answer is for educational purposes and/or special use cases only.

I've recently bumped into a similar problem in a project where I could not introduce any external dependencies, so I had to roll my own load-balancing implementation, and it turned out being surprisingly simple (until you start wiring in cancellation and ordered results - but that's outside the scope of this question).

I am disregarding the "10 dedicated threads" requirement since, as others have already explained, it does not make sense when dealing with async operations. Instead I will maintain up to N concurrent Task instances processing the work load.

static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism)
{
    Queue<Func<Task>> queue = new Queue<Func<Task>>(taskFactories);

    if (queue.Count == 0) {
        return;
    }

    List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);

    do
    {
        while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
        {
            Func<Task> taskFactory = queue.Dequeue();

            tasksInFlight.Add(taskFactory());
        }

        Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);

        // Propagate exceptions. In-flight tasks will be abandoned if this throws.
        await completedTask.ConfigureAwait(false);

        tasksInFlight.Remove(completedTask);
    }
    while (queue.Count != 0 || tasksInFlight.Count != 0);
}

Usage:

Func<Task>[] taskFactories = {
    () => _repository.WriteData(someData1),
    () => _repository.WriteData(someData2),
    () => _repository.WriteData(someData3),
    () => _repository.WriteData(someData4)
};

await InvokeAsync(taskFactories, maxDegreeOfParallelism: 2);

... or

IEnumerable<SomeData> someDataCollection = ... // Get data.

await ParallelTasks.InvokeAsync(
    someDataCollection.Select(someData => new Func<Task>(() => _repository.WriteData(someData))),
    maxDegreeOfParallelism: 10
);

This solution does not suffer from the poor load balancing problem which is often seen in other trivial implementations in cases where tasks have varying durations and the input is pre-partitioned (such as this one).

Version with perf optimizations and argument validation: Gist.

Share:
22,306
Juzzbott
Author by

Juzzbott

Basically, I’m a CMS guy. Most of my development for the last 5 or so years has beeen centered around the Sitecore CMS platform. This has been across a broad spectrum of sites, including classifieds, government sites, e-commerce etc.

Updated on July 09, 2022

Comments

  • Juzzbott
    Juzzbott almost 2 years

    I'm just wondering if someone can point me in the right direction about the async/await framework and thread pools?

    Basically, what I'm trying to do is have x number of operations executed in a separate thread/async, but across a maximum of y threads.

    For example, let's say that I have 100 database operations: await _repository.WriteData(someData);

    What I'd like to do is have some method of running 10 of those operations at a time (ideally in a separate thread each, so 10 threads), and as each one is finished, the next is kicked off on the thread that then becomes available. We then wait for all operations to complete and all threads to finish...

    Is this something that is readily achievable without too much effort or adding huge amounts of complexity?

  • Kirill Shlenskiy
    Kirill Shlenskiy about 8 years
    TPL Dataflow is a great fit for the stated problem. Not sure why anyone would downvote this.
  • hyankov
    hyankov over 6 years
    TPL Dataflow is not "built-in". I have learned that from your blog, haha
  • virender
    virender about 6 years
    @i3arnon How to add a new task to queue when any one task of 10 task completed. Means add a new task to the queue. Here my objective is for example if we already set a limit of 10 task run in a single time by SemaphoreSlim or MaxDegreeOfParallelism but I don't want to create 100 task and then set limit by SemaphoreSlim or MaxDegreeOfParallelism and control them to run 10 on a single time. , I only want to create a new task when any one task completed from 10 task and this process will continue infinitely.
  • Boris B.
    Boris B. over 2 years
    Nice solution, two suggestions though: 1. Use a LinkedList<Task> to track in-flight tasks, as its insertion/removal is O(1) without any memory moves involved under the hood. 2. If a task throws catch it outside of the loop and then WaitAll on the final in-flight list collecting any exceptions into one AggregateException. That way you account for all tasks and their end states instead of letting them to potentially outlive your InvokeAsync.