Ensuring task execution order in ThreadPool

29,350

Solution 1

Something like the following will allow serial and parallel tasks to be queued, where serial tasks will be executed one after the other, and parallel tasks will be executed in any order, but in parallel. This gives you the ability to serialize tasks where necessary, also have parallel tasks, but do this as tasks are received i.e. you do not need to know about the entire sequence up-front, execution order is maintained dynamically.

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}

Update

To handle task groups with serial and parallel task mixes, a GroupedTaskQueue can manage a TaskQueue for each group. Again, you do not need to know about groups up-front, it is all dynamically managed as tasks are received.

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}

Solution 2

Thread pools are good for cases where the relative order of the tasks doesn't matter, provided they all get done. In particular, it must be OK for them all to be done in parallel.

If your tasks must be done in a specific order, then they are not suitable for parallelism, so a thread pool is not appropriate.

If you want to move these serial tasks off the main thread, then a single background thread with a task queue would be appropriate for those tasks. You can continue to use a thread pool for the remaining tasks which are suitable for parallelism.

Yes, it means you have to decide where to submit the task depending on whether it is an in-order task or a "may be parallelized" task, but this is not a big deal.

If you have groups that must be serialized, but which can run in parallel with other tasks then you have multiple choices:

  1. Create a single task for each group, which does the relevant group tasks in order, and post this task to the thread pool.
  2. Have each task in a group explicitly wait for the previous task in the group, and post them to the thread pool. This requires that your thread pool can handle the case where a thread is waiting for a not-yet-scheduled task without deadlocking.
  3. Have a dedicated thread for each group, and post group tasks on the appropriate message queue.

Solution 3

Basically, there are a number of pending tasks. Some of the tasks can only be performed when one or more other pending tasks have finished executing.

The pending tasks can be modeled in a dependency graph:

  • "task 1 -> task2" means "task 2 can be executed only after task 1 is finished." the arrows point in the direction of execution order.
  • the indegree of a task (the number of tasks pointing to it) determines whether the task is ready for execution. If the indegree is 0, it can be executed.
  • sometimes a task must wait for multiple tasks to finish, the indegree is then >1.
  • if a task doesn't have to wait for other tasks to finish anymore (its indegree is zero), it can be submitted to the thread pool with worker threads, or the queue with tasks waiting to be picked up by a worker thread. You know the submitted task will not cause deadlock, because the task isn't waiting for anything. As an optimization, you can use a priority queue, e.g. in which tasks that more tasks in the dependency graph depend on will be executed first. This also can't provoke deadlock, because all tasks in the thread pool can be executed. It can provoke starvation, however.
  • If a task finishes execution, it can be removed from the dependency graph, possibly reducing the indegree of other tasks, which can in turn be submitted to the pool of working threads.

So there is (at least) one thread used to add/remove pending tasks, and there is a thread pool of working threads.

When a task is added to the dependency graph, you must check:

  • how the task is connected in the dependency graph: what tasks must it wait for to finish and what tasks must wait for it to finish? Draw connections from and to the new task accordingly.
  • once the connections are drawn: did the new connections cause any cycles in the dependency graph? If so, there is a deadlock situation.

Performance:

  • this pattern is slower than sequential execution if parallel execution is in fact rarely possible, because you need extra administration to do everything almost sequentially anyway.
  • this pattern is fast if many tasks can be performed simultaneously in practice.

Assumptions:

As you may have read between the lines, you must design the tasks so that they don't interfere with other tasks. Also, there must be a way to determine the priority of the tasks. The task priority should include the data handled by each task. Two tasks may not alter the same object simultaneously; one of the tasks should get priority over the other one instead, or the performed operations on the object must be thread-safe.

Solution 4

To do what you want to do with a threadpool, you might have to create some kind of scheduler.

Something like that:

TaskQueue -> Scheduler -> Queue -> ThreadPool

Scheduler runs in its own thread, keeping tracks of dependencies between jobs. When a job is ready to be done, the scheduler just pushes it in the queue for the threadpool.

The ThreadPool might have to send signals to the Scheduler to indicate when a job is done so the scheduler can put jobs depending on that job into the Queue.

In your case, the dependencies could probably be stored in a linked list.

Let's say you have the following dependencies: 3 -> 4 -> 6 -> 8

Job 3 is running on the threadpool, you still have no ideas that job 8 exists.

Job 3 ends. You remove the 3 from the linked list, you put job 4 on the queue to the threadpool.

Job 8 arrives. You put it at the end of the linked list.

The only constructs that have to be fully synchronized are the Queues before and after the scheduler.

Solution 5

If I understand the problem correctly, the jdk executors don't have this capability but it's easy to roll your own. You basically need

  • a pool of worker threads, each of which has a dedicated queue
  • some abstraction over those queues to which you offer work (c.f. the ExecutorService)
  • some algorithm that deterministically selects a specific queue for each piece of work
  • each piece of work then gets offers to the right queue and hence gets processed in the right order

The difference to the jdk executors is that they have 1 queue with n threads but you want n queues and m threads (where n may or may not equal m)

* edit after reading that each task has a key *

In a bit more detail

  • write some code that transforms a key into an index (an int) in a given range (0-n where n is the number of threads you want), this could be as simple as key.hashCode() % n or it could be some static mapping of known key values to threads or whatever you want
  • at startup
    • create n queues, put them in an indexed structure (array, list whatever)
    • start n threads, each thread just does a blocking take from the queue
    • when it receives some work, it knows how to execute work specific to that task/event (you can obviously have some mapping of tasks to actions if you have heterogenous events)
  • store this behind some facade that accepts the work items
  • when a task arrives, hand it to the facade
    • the facade finds the right queue for the task based on the key, offers it to that queue

it's easier enough to add auto restarting worker threads to this scheme, you just then need the worker thread to register with some manager to state "I own this queue" and then some housekeeping around that + detection of errors in the thread (which means it unregisters the ownership of that queue returning the queue to a free pool of queues which is a trigger to start a new thread up)

Share:
29,350
nc3b
Author by

nc3b

Updated on July 09, 2022

Comments

  • nc3b
    nc3b almost 2 years

    I have been reading about the thread-pool pattern and I can't seem to find the usual solution for the following problem.

    I sometimes want tasks to be executed serially. For example, I read chunks of text from a file and for some reason I need the chunks to be processed in that order. So basically I want to eliminate concurrency for some of the tasks.

    Consider this scenario where the tasks with * need to be processed in the order they were pushed in. The other tasks can be processed in any order.

    push task1
    push task2
    push task3   *
    push task4   *
    push task5
    push task6   *
    ....
    and so on
    

    In the context of a thread-pool, without this constraint, a single queue of pending tasks works fine but clearly here it doesn't.

    I thought about having some of the threads operate on a thread-specific queue and the others on the "global" queue. Then, in order to execute some of the tasks serially, I simply have to push them onto a queue where a single thread looks. It does sounds a bit clumsy.

    So, the real question in this long story: how would you solve this ? How would you ensure those tasks are ordered?

    EDIT

    As a more general problem, suppose the scenario above becomes

    push task1
    push task2   **
    push task3   *
    push task4   *
    push task5
    push task6   *
    push task7   **
    push task8   *
    push task9
    ....
    and so on
    

    What I mean is that the tasks within a group should be executed sequentially, but the groups themselves can mix. So you can have 3-2-5-4-7 for example.

    One other thing to note is that I don't have access to all the tasks in a group upfront (and I can't wait for all of them to arrive before starting the group).