Shared Queue in C++

16,483

Solution 1

2 easy solutions to let the thread end:

  1. send an end message on the queue.
  2. add another condition to the condition variable to command to end

    while(queue.empty() && (! RequestToEnd)) m_cond.wait(lock);
    if (RequestToEnd) { doEndActions(); }
    else { T result=m_queue.front(); m_queue.pop(); return result; }
    

Solution 2

First, do you really need to terminate the thread? If not, don't.

If you do have to, then just queue it a suicide pill. I usually send a NULL cast to T. The thread checks T and, if NULL, cleans up, returns and so dies.

Also, you may need to purge the queue first by removing and delete()ing all the items.

Share:
16,483
Novalis
Author by

Novalis

It is just me:-)

Updated on June 04, 2022

Comments

  • Novalis
    Novalis almost 2 years

    I just simply get packets from network, and Enqueue them in one thread and then consume this packets (Dequeue) in an other thread.

    So i decide to use boost library to make a shared queue based on https://www.quantnet.com/cplusplus-multithreading-boost/

    template <typename T>
    class SynchronisedQueue
    {
    private:
        std::queue<T> m_queue;  // Use STL queue to store data
        boost::mutex m_mutex;   // The mutex to synchronise on
        boost::condition_variable m_cond;// The condition to wait for
    
    public:
    
        // Add data to the queue and notify others
        void Enqueue(const T& data)
        {
            // Acquire lock on the queue
            boost::unique_lock<boost::mutex> lock(m_mutex);
    
            // Add the data to the queue
            m_queue.push(data);
    
            // Notify others that data is ready
            m_cond.notify_one();
    
        } // Lock is automatically released here
    
        // Get data from the queue. Wait for data if not available
        T Dequeue()
        {
    
            // Acquire lock on the queue
            boost::unique_lock<boost::mutex> lock(m_mutex);
    
            // When there is no data, wait till someone fills it.
            // Lock is automatically released in the wait and obtained 
            // again after the wait
            while (m_queue.size()==0) m_cond.wait(lock);
    
            // Retrieve the data from the queue
            T result=m_queue.front(); m_queue.pop();
            return result;
    
        } // Lock is automatically released here
    };
    

    The problem is , while not getting any data, Dequeue() method blocks my consumer thread, and when i want to terminate consumer thread i can not able to end it or stop it sometimes.

    What is the suggested way to end blocking of Dequeue(), so that i can safely terminate the thread that consume packets? Any ideas suggestions?

    PS: The site https://www.quantnet.com/cplusplus-multithreading-boost/ use "boost::this_thread::interruption_point();" for stopping consumer thread ... Because of my legacy code structure this is not possible for me...

    Based on Answer I update Shared Queue like this:

    #include <queue>
     #include <boost/thread.hpp>  
    
    template <typename T>
    class SynchronisedQueue
    {
    public:
    
        SynchronisedQueue()
        {
            RequestToEnd = false;  
            EnqueueData = true;
        }
        void Enqueue(const T& data)
        {
            boost::unique_lock<boost::mutex> lock(m_mutex);
    
            if(EnqueueData)
            {
                m_queue.push(data);
                m_cond.notify_one();
            }
    
        } 
    
    
        bool TryDequeue(T& result)
        {
            boost::unique_lock<boost::mutex> lock(m_mutex);
    
            while (m_queue.empty() && (! RequestToEnd)) 
            { 
                m_cond.wait(lock);
            }
    
            if( RequestToEnd )
            {
                 DoEndActions();
                 return false;
            }
    
            result= m_queue.front(); m_queue.pop();
    
            return true;
        }
    
        void StopQueue()
        {
            RequestToEnd =  true;
            Enqueue(NULL);        
        }
    
        int Size()
        {
            boost::unique_lock<boost::mutex> lock(m_mutex);
            return m_queue.size();
    
        }
    
    private:
    
        void DoEndActions()
        {
            EnqueueData = false;
    
            while (!m_queue.empty())  
            {
                m_queue.pop();
            }
        }
    
    
    
        std::queue<T> m_queue;              // Use STL queue to store data
        boost::mutex m_mutex;               // The mutex to synchronise on
        boost::condition_variable m_cond;            // The condition to wait for
    
        bool RequestToEnd;
        bool EnqueueData;
    };
    

    And Here is my Test Drive:

    #include <iostream>
    #include <string>
    
    #include "SynchronisedQueue.h"
    
    using namespace std;
    
    SynchronisedQueue<int> MyQueue;
    
    void InsertToQueue()
    {
        int i= 0;
    
        while(true)
        {
            MyQueue.Enqueue(++i);
        }
    
    }
    
    void ConsumeFromQueue()
    {
        while(true)
        {
            int number;
    
            cout << "Now try to dequeue" << endl;
    
            bool success = MyQueue.TryDequeue(number);
    
            if(success)
            {
    
                cout << "value is " << number << endl;
    
            }
    
            else
            {
                cout << " queue is stopped" << endl;
                break;
    
            }
        }
    
    
        cout << "Que size is : " << MyQueue.Size() <<  endl;
    }
    
    
    
    int main()
    {
    
        cout << "Test Started" << endl;
    
        boost::thread startInsertIntoQueue = boost::thread(InsertToQueue);
        boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue);
    
        boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds
    
        MyQueue.StopQueue();
    
        int endMain;
    
        cin >> endMain;
    
    
        return 0;
    }
    

    For now it seems to work...Based on new suggestions:

    i change Stop Method as:

    void StopQueue()
        {
            boost::unique_lock<boost::mutex> lock(m_mutex);
            RequestToEnd =  true;
            m_cond.notify_one();          
        }
    
  • Martin James
    Martin James about 12 years
    I would go with the end message. It may well be quickest to purge the queue first, (eg. by sucking all the entries out in a loop, perhaps at a high priority). A purge would require a 'bool tryPop()' or a bool pop() method that takes a timeout parameter.
  • stefaanv
    stefaanv about 12 years
    Good point, you either use a temporary thread that runs as needed and dies, or a permanent thread that you can command via thread communication.
  • stefaanv
    stefaanv about 12 years
    @MartinJames: There are probably even more solutions, but these popped up in my head and I guess there are arguments for both. Cleaning up is definitely an argument for the end message.
  • Martin James
    Martin James about 12 years
    @stefaanv - such operations are usually not frequent. I would probably just temporarily raise the priority of whatever thread was trying to terminate the target thread. OK. it's two kernel calls, but if you aren't doing it that often, it's better that fiddling with a completely separate thread.
  • Martin James
    Martin James about 12 years
    The cleanest solutions are usually the best. If there is a solution that means using, instead of changing, the working code of a class that is as inherently prone to disaster as a producer-consumer queue, then go with using <g>
  • Novalis
    Novalis about 12 years
    I update shared queue code based on your feedbacks...I also write test driver to check if it works...Can check it above...For now it seems to work...Multithreading is hard...So let me know if my above implementation may have problems...
  • Novalis
    Novalis about 12 years
    I update shared queue code based on your feedbacks...Can check my implementation above...Letme know if you see any potantial problems with above implementation
  • Martin James
    Martin James about 12 years
    Well, with a templated queue like this, an int or a pointer to anything might be supplied by the type. 'number' should really be of type T and, in the thread, after popping it back from the queue into a result variable of whatever class pointer is being used, compare it to NULL.
  • Martin James
    Martin James about 12 years
    Actually, this is getting near to rant moment:) There doesn't seem to be any easy way, in C++, of restricting templates to pointer types - an insane decision.
  • Ben Bryant
    Ben Bryant almost 9 years
    why not use wait_for instead of wait_until since you just want to give a timeout?