Process queue with multithreading or tasks

40,539

Solution 1

If you can use .Net 4.5, I'd suggest looking at Dataflow from the the Task Parallel Library (TPL).

That page leads to a lot of example walkthroughs such as How to: Implement a Producer-Consumer Dataflow Pattern and Walkthrough: Using Dataflow in a Windows Forms Application.

Have a look at that documentation to see if it would help you. It's quite a lot to take in, but I think it would probably be your best approach.

Alternatively, you could look into using a BlockingCollection along with its GetConsumingEnumerable() method to access items in the queue.

What you do is to split up the work into objects that you want to process in some way, and use a BlockingCollection to manage the queue.

Some sample code using ints rather than objects as the work items will help to demonstrate this:

When a worker thread has finished with it's current item, it will remove a new item from the work queue, process that item, then add it to the output queue.

A separate consumer thread removes completed items from the output queue and does something with them.

At the end we must wait for all the workers to finish (Task.WaitAll(workers)) before we can mark the output queue as completed (outputQueue.CompleteAdding()).

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            new Program().run();
        }

        void run()
        {
            int threadCount = 4;
            Task[] workers = new Task[threadCount];

            Task.Factory.StartNew(consumer);

            for (int i = 0; i < threadCount; ++i)
            {
                int workerId = i;
                Task task = new Task(() => worker(workerId));
                workers[i] = task;
                task.Start();
            }

            for (int i = 0; i < 100; ++i)
            {
                Console.WriteLine("Queueing work item {0}", i);
                inputQueue.Add(i);
                Thread.Sleep(50);
            }

            Console.WriteLine("Stopping adding.");
            inputQueue.CompleteAdding();
            Task.WaitAll(workers);
            outputQueue.CompleteAdding();
            Console.WriteLine("Done.");

            Console.ReadLine();
        }

        void worker(int workerId)
        {
            Console.WriteLine("Worker {0} is starting.", workerId);

            foreach (var workItem in inputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
                Thread.Sleep(100);          // Simulate work.
                outputQueue.Add(workItem);  // Output completed item.
            }

            Console.WriteLine("Worker {0} is stopping.", workerId);
        }

        void consumer()
        {
            Console.WriteLine("Consumer is starting.");

            foreach (var workItem in outputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consumer is using item {0}", workItem);
                Thread.Sleep(25);
            }

            Console.WriteLine("Consumer is finished.");
        }

        BlockingCollection<int> inputQueue = new BlockingCollection<int>();
        BlockingCollection<int> outputQueue = new BlockingCollection<int>();
    }
}

Solution 2

Parallel.ForEach from TPL. It's parallel for-each.

Sample (changed MessageWorkItem to generic Queue):

    public class MessageQueue
{
    public Queue<Message> MessageWorkItem { get; set; }

    public MessageQueue()
    {
        MessageWorkItem = new Queue<Message>();
    }

    public Message GetMessageMetaData()
    {
        try
        {
            // It is just a test, add only one item into the queue
            return new Message()
            {
                MessageID = Guid.NewGuid(),
                NumberToCall = "1111111111",
                FacilityID = "3333",
                NumberToDial = "2222222222",
                CountryCode = "1",
                Acknowledge = false
            };
        }
        catch (Exception ex)
        {
            return null;
        }
    }

    public void AddingItemToQueue()
    {
        var message = GetMessageMetaData();
        if (!message.Acknowledge)
        {
            lock (MessageWorkItem)
            {
                MessageWorkItem.Enqueue(message);
            }
        }
    }
}

public class Message
{
    public Guid MessageID { get; set; }
    public string NumberToCall { get; set; }
    public string FacilityID { get; set; }
    public string NumberToDial { get; set; }
    public string CountryCode { get; set; }
    public bool Acknowledge { get; set; }
}

class Program
{
    static void Main(string[] args)
    {
        MessageQueue me = new MessageQueue();
        for (int i = 0; i < 10000; i++)
            me.AddingItemToQueue();

        Console.WriteLine(me.MessageWorkItem.Count);

        Parallel.ForEach(me.MessageWorkItem, RunScript);
    }

    static void RunScript(Message item)
    {
        // todo: ...
        Console.WriteLine(item.MessageID);
        Thread.Sleep(300);
    }
}
Share:
40,539
Admin
Author by

Admin

Updated on August 02, 2022

Comments

  • Admin
    Admin over 1 year

    I have a telephony message application in which there are many many messages to be processed.Because telephone ports are limited, so the message will be processed first in first out. Each message has a flag 'Acknowledge' that indicates whichever is processed. It was initialized as false of course.

    I want to put all messages into a queue then process them with multiple threads or tasks.

        public class MessageQueue
        {
            public Queue MessageWorkItem { get; set; }
            public Messages Message { get; set; }
            public MessageQueue()
            {
                MessageWorkItem = new Queue();
                Message = new Messages();
            }
            public void GetMessageMetaData()
            {
                try
                {
                    // It is just a test, add only one item into the queue
                    Message.MessageID = Guid.NewGuid();
                    Message.NumberToCall = "1111111111";
                    Message.FacilityID = "3333";
                    Message.NumberToDial = "2222222222";
                    Message.CountryCode = "1";
                    Message.Acknowledge = false;
                }
                catch (Exception ex)
                {
                }
            }
    
            public void AddingItemToQueue()
            {
                GetMessageMetaData();
                if (!Message.Acknowledge)
                {
                    lock (MessageWorkItem)
                    {
                        MessageWorkItem.Enqueue(Message);
                    }
                }
            }
        }
    
        public class Messages
        {
            public Guid MessageID { get; set; }
            public string NumberToCall { get; set; }
            public string FacilityID { get; set; }
            public string NumberToDial { get; set; }
            public string CountryCode { get; set; }
            public bool Acknowledge { get; set; }
        }
    

    Now my question is how to dequeue the item from the queue with multithreading. For the each item from the queue, I want to run a script.

            public void RunScript(Message item)
            {
                try
                {
                    PlayMessage(item); 
                    return;
                }
                catch (HangupException hex)
                {
                    Log.WriteWithId("Caller Hungup!", hex.Message);
                }
                catch (Exception ex)
                {
                    Log.WriteException(ex, "Unexpected exception: {0}");
                }
            }
    

    What I thought was to see if

    if(MessageWorkItem.Count >= 1) Then doing something but I do need code help.

  • oleksii
    oleksii about 10 years
    This won't work if consumer keeps working and adding stuff into the queue
  • Servy
    Servy about 10 years
    You're also adding items to a queue while iterating over it, which will throw an exception, and also be subject to all sorts of race conditions due tot he fact that it's even being done from different threads. Queue simply isn't built to be accessed from multiple threads simultaneously.
  • Admin
    Admin about 10 years
    @@Mattew, I can use .net 4.5.
  • Matthew Watson
    Matthew Watson about 10 years
    @Love Ah, you had the .Net 4.0 tag on your question (someone has fixed that for you already, I notice! :)
  • Scott Chamberlain
    Scott Chamberlain about 10 years
    @Servy You are 100% correct about the "adding items to a queue while iterating over it" issue, but one clarification I want to make on your second statement. Parallel.ForEach will not multithread accesses to the IEnumerable<T> using the default partitioner, it will pull out a bunch of items in to a internal queue using a single thread then hand out that work to several worker threads. But again I stress that Servy is correct that reading from that single thread and writing using one or more other threads is not doable with Queue's enumerator.
  • Matthew Watson
    Matthew Watson about 10 years
    @Love I edited my answer slightly in the light of the above information.
  • Admin
    Admin about 10 years
    --@Mattew, for my particular case, where should I place my RunScript()? I guess in void worker()?
  • Matthew Watson
    Matthew Watson about 10 years
    @Love Yep, that would be the appropriate place. The payload would be workItem, which you would make whatever type you need to hold the information that you want to process as a work unit - probably an instance of Message in your case? In which case you'd use BlockingCollection<Message>
  • Admin
    Admin about 10 years
    --@Mattew, that is a great help. The other thing I am struggling is that the count in the queue is dynamically changed instead of static amount. How to create such a queue? Maybe I should post a new question?
  • Matthew Watson
    Matthew Watson about 10 years
    @Love I'm not sure what you mean there - the number of items in the queue in my sample code is dynamically changed as items are added or removed. The number of tasks (threads) processing items in my example is fixed at 4, but you can change that number of course. The loop adds exactly 100 items, but again you can change that in your code.