Shared Queue in C++
Solution 1
2 easy solutions to let the thread end:
- send an end message on the queue.
add another condition to the condition variable to command to end
while(queue.empty() && (! RequestToEnd)) m_cond.wait(lock); if (RequestToEnd) { doEndActions(); } else { T result=m_queue.front(); m_queue.pop(); return result; }
Solution 2
First, do you really need to terminate the thread? If not, don't.
If you do have to, then just queue it a suicide pill. I usually send a NULL cast to T. The thread checks T and, if NULL, cleans up, returns and so dies.
Also, you may need to purge the queue first by removing and delete()ing all the items.
Comments
-
Novalis almost 2 years
I just simply get packets from network, and Enqueue them in one thread and then consume this packets (Dequeue) in an other thread.
So i decide to use boost library to make a shared queue based on https://www.quantnet.com/cplusplus-multithreading-boost/
template <typename T> class SynchronisedQueue { private: std::queue<T> m_queue; // Use STL queue to store data boost::mutex m_mutex; // The mutex to synchronise on boost::condition_variable m_cond;// The condition to wait for public: // Add data to the queue and notify others void Enqueue(const T& data) { // Acquire lock on the queue boost::unique_lock<boost::mutex> lock(m_mutex); // Add the data to the queue m_queue.push(data); // Notify others that data is ready m_cond.notify_one(); } // Lock is automatically released here // Get data from the queue. Wait for data if not available T Dequeue() { // Acquire lock on the queue boost::unique_lock<boost::mutex> lock(m_mutex); // When there is no data, wait till someone fills it. // Lock is automatically released in the wait and obtained // again after the wait while (m_queue.size()==0) m_cond.wait(lock); // Retrieve the data from the queue T result=m_queue.front(); m_queue.pop(); return result; } // Lock is automatically released here };
The problem is , while not getting any data, Dequeue() method blocks my consumer thread, and when i want to terminate consumer thread i can not able to end it or stop it sometimes.
What is the suggested way to end blocking of Dequeue(), so that i can safely terminate the thread that consume packets? Any ideas suggestions?
PS: The site https://www.quantnet.com/cplusplus-multithreading-boost/ use "boost::this_thread::interruption_point();" for stopping consumer thread ... Because of my legacy code structure this is not possible for me...
Based on Answer I update Shared Queue like this:
#include <queue> #include <boost/thread.hpp> template <typename T> class SynchronisedQueue { public: SynchronisedQueue() { RequestToEnd = false; EnqueueData = true; } void Enqueue(const T& data) { boost::unique_lock<boost::mutex> lock(m_mutex); if(EnqueueData) { m_queue.push(data); m_cond.notify_one(); } } bool TryDequeue(T& result) { boost::unique_lock<boost::mutex> lock(m_mutex); while (m_queue.empty() && (! RequestToEnd)) { m_cond.wait(lock); } if( RequestToEnd ) { DoEndActions(); return false; } result= m_queue.front(); m_queue.pop(); return true; } void StopQueue() { RequestToEnd = true; Enqueue(NULL); } int Size() { boost::unique_lock<boost::mutex> lock(m_mutex); return m_queue.size(); } private: void DoEndActions() { EnqueueData = false; while (!m_queue.empty()) { m_queue.pop(); } } std::queue<T> m_queue; // Use STL queue to store data boost::mutex m_mutex; // The mutex to synchronise on boost::condition_variable m_cond; // The condition to wait for bool RequestToEnd; bool EnqueueData; };
And Here is my Test Drive:
#include <iostream> #include <string> #include "SynchronisedQueue.h" using namespace std; SynchronisedQueue<int> MyQueue; void InsertToQueue() { int i= 0; while(true) { MyQueue.Enqueue(++i); } } void ConsumeFromQueue() { while(true) { int number; cout << "Now try to dequeue" << endl; bool success = MyQueue.TryDequeue(number); if(success) { cout << "value is " << number << endl; } else { cout << " queue is stopped" << endl; break; } } cout << "Que size is : " << MyQueue.Size() << endl; } int main() { cout << "Test Started" << endl; boost::thread startInsertIntoQueue = boost::thread(InsertToQueue); boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue); boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds MyQueue.StopQueue(); int endMain; cin >> endMain; return 0; }
For now it seems to work...Based on new suggestions:
i change Stop Method as:
void StopQueue() { boost::unique_lock<boost::mutex> lock(m_mutex); RequestToEnd = true; m_cond.notify_one(); }
-
Martin James about 12 yearsI would go with the end message. It may well be quickest to purge the queue first, (eg. by sucking all the entries out in a loop, perhaps at a high priority). A purge would require a 'bool tryPop()' or a bool pop() method that takes a timeout parameter.
-
stefaanv about 12 yearsGood point, you either use a temporary thread that runs as needed and dies, or a permanent thread that you can command via thread communication.
-
stefaanv about 12 years@MartinJames: There are probably even more solutions, but these popped up in my head and I guess there are arguments for both. Cleaning up is definitely an argument for the end message.
-
Martin James about 12 years@stefaanv - such operations are usually not frequent. I would probably just temporarily raise the priority of whatever thread was trying to terminate the target thread. OK. it's two kernel calls, but if you aren't doing it that often, it's better that fiddling with a completely separate thread.
-
Martin James about 12 yearsThe cleanest solutions are usually the best. If there is a solution that means using, instead of changing, the working code of a class that is as inherently prone to disaster as a producer-consumer queue, then go with using <g>
-
Novalis about 12 yearsI update shared queue code based on your feedbacks...I also write test driver to check if it works...Can check it above...For now it seems to work...Multithreading is hard...So let me know if my above implementation may have problems...
-
Novalis about 12 yearsI update shared queue code based on your feedbacks...Can check my implementation above...Letme know if you see any potantial problems with above implementation
-
Martin James about 12 yearsWell, with a templated queue like this, an int or a pointer to anything might be supplied by the type. 'number' should really be of type T and, in the thread, after popping it back from the queue into a result variable of whatever class pointer is being used, compare it to NULL.
-
Martin James about 12 yearsActually, this is getting near to rant moment:) There doesn't seem to be any easy way, in C++, of restricting templates to pointer types - an insane decision.
-
Ben Bryant almost 9 yearswhy not use wait_for instead of wait_until since you just want to give a timeout?