How to work threading with ConcurrentQueue<T>

68,728

Solution 1

From my understanding of the problem, you are missing a few things.

The concurrent queue is a data structure designed to accept multiple threads reading and writing to the queue without you needing to explicitly lock the data structure. (All that jazz is taken care of behind the scenes, or the collection is implemented in such a way that it doesn't need to take a lock.)

With that in mind, it looks like the pattern you are trying to use is the "Produce/Consumer". First, you have some tasks producing work (and adding items to the queue). And second you have a second task Consuming things from the queue (and dequeing items).

So really you want two threads: one adding items and a second removing items. Because you are using a concurrent collection, you can have multiple threads adding items and multiple threads removing items. But obviously the more contention you have on the concurrent queue the quicker that will become the bottleneck.

Solution 2

I think ConcurrentQueue is useful only in very few cases. Its main advantage is that it is lock free. However, usually the producer thread(s) have to inform the consumer thread(s) somehow that there is data available to process. This signalling between threads needs locks and negates the benefit of using ConcurrentQueue. The fastest way to synchronize threads is using Monitor.Pulse(), which works only within a lock. All other synchronization tools are even slower.

Of course, the consumer can just continuously check if there is something in the queue, which works without locks, but is a huge waste of processor resources. A little bit better is if the consumer waits between checking.

Raising a thread when writing to the queue is a very bad idea. Using ConcurrentQueue to save maybe 1 microsecond will be completely wasted by executing the eventhandler, which might take 1000 times longer.

If all the processing is done in an event handler or an async call, the question is why still a queue is needed? Better pass the data directly to the handler and don't use a queue at all.

Please note that the implementation of ConcurrentQueue is rather complicated to allow concurrency. In most cases, better use a normal Queue<> and lock every access to the queue. Since the queue access needs only microseconds, it is extremely unlikely that 2 threads access the queue in the same microsecond and there will be hardly ever any delay because of locking. Using a normal Queue<> with locking will often result in faster code execution than ConcurrentQueue.

Solution 3

This is the complete solution for what I came up with:

public class TableTransporter
{
    private static int _indexer;

    private CustomQueue tableQueue = new CustomQueue();
    private Func<DataTable, String> RunPostProcess;
    private string filename;

    public TableTransporter()
    {
        RunPostProcess = new Func<DataTable, String>(SerializeTable);
        tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued);
    }

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e)
    {
        //  do something with table
        //  I can't figure out is how to pass custom object in 3rd parameter
        RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename);
    }

    public void ExtractData()
    {
        // perform data extraction
        tableQueue.Enqueue(MakeTable());
        Console.WriteLine("Table count [{0}]", tableQueue.Count);
    }

    private DataTable MakeTable()
    { return new DataTable(String.Format("Table{0}", _indexer++)); }

    private string SerializeTable(DataTable Table)
    {
        string file = Table.TableName + ".xml";

        DataSet dataSet = new DataSet(Table.TableName);

        dataSet.Tables.Add(Table);

        Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file);
        string xmlstream = String.Empty;

        using (MemoryStream memstream = new MemoryStream())
        {
            XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet));
            XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8);

            xmlSerializer.Serialize(xmlWriter, dataSet);
            xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray());

            using (var fileStream = new FileStream(file, FileMode.Create))
                fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2);
        }
        filename = file;

        return file;
    }

    private void PostComplete(IAsyncResult iasResult)
    {
        string file = (string)iasResult.AsyncState;
        Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file);

        RunPostProcess.EndInvoke(iasResult);
    }

    public static String UTF8ByteArrayToString(Byte[] ArrBytes)
    { return new UTF8Encoding().GetString(ArrBytes); }

    public static Byte[] StringToUTF8ByteArray(String XmlString)
    { return new UTF8Encoding().GetBytes(XmlString); }
}

public sealed class CustomQueue : ConcurrentQueue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;

    public CustomQueue()
    { }
    public CustomQueue(IEnumerable<DataTable> TableCollection)
        : base(TableCollection)
    { }

    new public void Enqueue (DataTable Table)
    {
        base.Enqueue(Table);
        OnTableQueued(new TableQueuedEventArgs(Table));
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

public class TableQueuedEventArgs : EventArgs
{
    #region Fields
    #endregion

    #region Init
    public TableQueuedEventArgs(DataTable Table)
    {this.Table = Table;}
    #endregion

    #region Functions
    #endregion

    #region Properties
    public DataTable Table
    {get;set;}
    #endregion
}

As proof of concept, it seems to work pretty well. At most I saw 4 worker threads.

Share:
68,728
IAbstract
Author by

IAbstract

From grass roots AppleSoft Basic and Assembly, to VB6 and C#/VB.NET - I have seen coding evolve into something anyone can do. However, it takes serious dedication to become a programmer. I have been working with MEF to better learn usage and techniques. I am also interested in open source projects and looking for opportunities with XNA. I have experience with TCP, Sql and MySql, and Lua integration. Also, I am looking to gain some experience with Ruby. Never 'try', always 'do'. 'Try' is an ingredient for failure - paraphrased from 'I Love You Man'. #SOreadytohelp

Updated on July 09, 2022

Comments

  • IAbstract
    IAbstract almost 2 years

    I am trying to figure out what the best way of working with a queue will be. I have a process that returns a DataTable. Each DataTable, in turn, is merged with the previous DataTable. There is one problem, too many records to hold until the final BulkCopy (OutOfMemory).

    So, I have determined that I should process each incoming DataTable immediately. Thinking about the ConcurrentQueue<T>...but I don't see how the WriteQueuedData() method would know to dequeue a table and write it to the database.

    For instance:

    public class TableTransporter
    {
        private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();
    
        public TableTransporter()
        {
            tableQueue.OnItemQueued += new EventHandler(WriteQueuedData);   // no events available
        }
    
        public void ExtractData()
        {
            DataTable table;
    
            // perform data extraction
            tableQueue.Enqueue(table);
        }
    
        private void WriteQueuedData(object sender, EventArgs e)
        {
            BulkCopy(e.Table);
        }
    }
    

    My first question is, aside from the fact that I don't actually have any events to subscribe to, if I call ExtractData() asynchronously will this be all that I need? Second, is there something I'm missing about the way ConcurrentQueue<T> functions and needing some form of trigger to work asynchronously with the queued objects?

    Update I have just derived a class from ConcurrentQueue<T> that has an OnItemQueued event handler. Then:

    new public void Enqueue (DataTable Table)
    {
        base.Enqueue(Table);
        OnTableQueued(new TableQueuedEventArgs(Table));
    }
    
    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;
    
        if (handler != null)
        {
            handler(this, table);
        }
    }
    

    Any concerns about this implementation?

  • IAbstract
    IAbstract over 13 years
    I thought I had 2 threads. The main thread would basically wait for the event to trigger. The second thread begins as an asynchronous call to ExtractData(). In the async callback I will simply continue the extraction process.
  • IAbstract
    IAbstract over 13 years
    Actually, I think I have it backwards; main thread should be en-queuing datatables; then begin asynchronous writing method via the enqueued item event trigger.
  • Richard Priddy
    Richard Priddy about 8 years
    Looking through this, it's a good implementation, however, having run a quick test, when does an item get dequeued?
  • IAbstract
    IAbstract about 8 years
    @RichardPriddy: since this was just over 5 years ago (and I have long since moved on to my 3rd company), I can only assume this was not a complete example. Note the proof of concept remark at the end. ;) That said, depending on requirements you could expose the enqueued event and let something else handle dequeueing. Otherwise, it might be logical to dequeue somewhere in the post process function's AsyncCallback. It would be really difficult to pinpoint anything more specific at this late date.
  • user3085342
    user3085342 over 7 years
    Shame about receiving the down vote. I think it's a valid, pragmatic opinion.
  • jocull
    jocull about 6 years
    >producer thread(s) have to inform the consumer thread(s) somehow that there is data available to process How do you do this typically?
  • Peter Huber
    Peter Huber about 6 years
    For an overview of how threads can synchronize each other see: Microsoft, Overview of Synchronization Primitives. In those cases, ConcurrentQueue is not helpfull, because the Synchronization Primitives use locking anyway. ConcurrentQueue might be useful for massive parallel problems, when several threads produce something as fast as they can and another thread collects those results and processes them. Once the problem is solved, all threads are released and no waiting is involved.
  • Kamran Bigdely
    Kamran Bigdely about 5 years
    @Chris Smith: I don't know how to message you. You have a malicious website on your profile. Please remove it.
  • Chris Smith
    Chris Smith about 5 years
    @KamranBigdely thank you for pointing that out! Domain of my blog lapsed, and bad actor took it over. Fixed.