Best way in .NET to manage queue of tasks on a separate (single) thread

58,123

Solution 1

To create an asynchronous single degree of parallelism queue of work you can simply create a SemaphoreSlim, initialized to one, and then have the enqueing method await on the acquisition of that semaphore before starting the requested work.

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

Of course, to have a fixed degree of parallelism other than one simply initialize the semaphore to some other number.

Solution 2

Your best option as I see it is using TPL Dataflow's ActionBlock:

var actionBlock = new ActionBlock<string>(address =>
{
    if (!IsDuplicate(address))
    {
        LocateAddress(address);
    }
});

actionBlock.Post(context.Request.UserHostAddress);

TPL Dataflow is robust, thread-safe, async-ready and very configurable actor-based framework (available as a nuget)

Here's a simple example for a more complicated case. Let's assume you want to:

  • Enable concurrency (limited to the available cores).
  • Limit the queue size (so you won't run out of memory).
  • Have both LocateAddress and the queue insertion be async.
  • Cancel everything after an hour.

var actionBlock = new ActionBlock<string>(async address =>
{
    if (!IsDuplicate(address))
    {
        await LocateAddressAsync(address);
    }
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10000,
    MaxDegreeOfParallelism = Environment.ProcessorCount,
    CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});

await actionBlock.SendAsync(context.Request.UserHostAddress);

Solution 3

Actually you don't need to run tasks in one thread, you need them to run serially (one after another), and FIFO. TPL doesn't have class for that, but here is my very lightweight, non-blocking implementation with tests. https://github.com/Gentlee/SerialQueue

Also have @Servy implementation there, tests show it is twice slower than mine and it doesn't guarantee FIFO.

Example:

private readonly SerialQueue queue = new SerialQueue();

async Task SomeAsyncMethod()
{
    var result = await queue.Enqueue(DoSomething);
}

Solution 4

Use BlockingCollection<Action> to create a producer/consumer pattern with one consumer (only one thing running at a time like you want) and one or many producers.

First define a shared queue somewhere:

BlockingCollection<Action> queue = new BlockingCollection<Action>();

In your consumer Thread or Task you take from it:

//This will block until there's an item available
Action itemToRun = queue.Take()

Then from any number of producers on other threads, simply add to the queue:

queue.Add(() => LocateAddress(context.Request.UserHostAddress));

Solution 5

I'm posting a different solution here. To be honest I'm not sure whether this is a good solution.

I'm used to use BlockingCollection to implement a producer/consumer pattern, with a dedicated thread consuming those items. It's fine if there are always data coming in and consumer thread won't sit there and do nothing.

I encountered a scenario that one of the application would like to send emails on a different thread, but total number of emails is not that big. My initial solution was to have a dedicated consumer thread (created by Task.Run()), but a lot of time it just sits there and does nothing.

Old solution:

private readonly BlockingCollection<EmailData> _Emails =
    new BlockingCollection<EmailData>(new ConcurrentQueue<EmailData>());

// producer can add data here
public void Add(EmailData emailData)
{
    _Emails.Add(emailData);
}

public void Run()
{
    // create a consumer thread
    Task.Run(() => 
    {
        foreach (var emailData in _Emails.GetConsumingEnumerable())
        {
            SendEmail(emailData);
        }
    });
}

// sending email implementation
private void SendEmail(EmailData emailData)
{
    throw new NotImplementedException();
}

As you can see, if there are not enough emails to be sent (and it is my case), the consumer thread will spend most of them sitting there and do nothing at all.

I changed my implementation to:

// create an empty task
private Task _SendEmailTask = Task.Run(() => {});

// caller will dispatch the email to here
// continuewith will use a thread pool thread (different to
// _SendEmailTask thread) to send this email
private void Add(EmailData emailData)
{
    _SendEmailTask = _SendEmailTask.ContinueWith((t) =>
    {
        SendEmail(emailData);
    });
}

// actual implementation
private void SendEmail(EmailData emailData)
{
    throw new NotImplementedException();
}

It's no longer a producer/consumer pattern, but it won't have a thread sitting there and does nothing, instead, every time it is to send an email, it will use thread pool thread to do it.

Share:
58,123
Josh
Author by

Josh

Loves: SharePoint, Tablets, PowerShell, WPF, Silverlight, LINQ. Hates: SharePoint, Liberals

Updated on October 26, 2020

Comments

  • Josh
    Josh over 3 years

    I know that asynchronous programming has seen a lot of changes over the years. I'm somewhat embarrassed that I let myself get this rusty at just 34 years old, but I'm counting on StackOverflow to bring me up to speed.

    What I am trying to do is manage a queue of "work" on a separate thread, but in such a way that only one item is processed at a time. I want to post work on this thread and it doesn't need to pass anything back to the caller. Of course I could simply spin up a new Thread object and have it loop over a shared Queue object, using sleeps, interrupts, wait handles, etc. But I know things have gotten better since then. We have BlockingCollection, Task, async/await, not to mention NuGet packages that probably abstract a lot of that.

    I know that "What's the best..." questions are generally frowned upon so I'll rephrase it by saying "What is the currently recommended..." way to accomplish something like this using built-in .NET mechanisms preferably. But if a third party NuGet package simplifies things a bunch, it's just as well.

    I considered a TaskScheduler instance with a fixed maximum concurrency of 1, but seems there is probably a much less clunky way to do that by now.

    Background

    Specifically, what I am trying to do in this case is queue an IP geolocation task during a web request. The same IP might wind up getting queued for geolocation multiple times, but the task will know how to detect that and skip out early if it's already been resolved. But the request handler is just going to throw these () => LocateAddress(context.Request.UserHostAddress) calls into a queue and let the LocateAddress method handle duplicate work detection. The geolocation API I am using doesn't like to be bombarded with requests which is why I want to limit it to a single concurrent task at a time. However, it would be nice if the approach was allowed to easily scale to more concurrent tasks with a simple parameter change.