C++ - Multi-threading - Communication between threads

13,057

Solution 1

No need for a second std::condition_variable, just reuse the one you have. As mentioned by other you should consider using std::atomic<bool> instead of plain bool. But I must admit that g++ with -O3 does not optimize it away.

#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load())
    {
        std::unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one(); // Notify worker
        cv.wait(lk); // Wait for worker to complete
     }
}

void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load())
    {
        std::unique_lock<std::mutex> lk(m);
        cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
        cv.wait(lk); // Wait for the generator to complete
        std::cout << numbers.front() << std::endl;
        numbers.pop();
        consumer_count++;
     }
}

int main() {
    std::condition_variable cv;
    std::mutex m;
    std::atomic<bool> workdone(false);
    std::queue<int> numbers;

    //start threads
    std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
    std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));


    //wait for 3 seconds, then join the threads
    std::this_thread::sleep_for(std::chrono::seconds(3));
    workdone = true;
    cv.notify_all(); // To prevent dead-lock

    producer.join();
    consumer.join();

    //output the counters
    std::cout << producer_count << std::endl;
    std::cout << consumer_count << std::endl;

    return 0;
}

EDIT:

To avoid the sporadic off-by-one error you could use this:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load())
    {
        std::unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one(); // Notify worker
        cv.wait(lk); // Wait for worker to complete
     }
}

void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load() or !numbers.empty())
    {
        std::unique_lock<std::mutex> lk(m);
        cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
        if (numbers.empty())
            cv.wait(lk); // Wait for the generator to complete
        if (numbers.empty())
            continue;
        std::cout << numbers.front() << std::endl;
        numbers.pop();
        consumer_count++;
     }
}

int main() {
    std::condition_variable cv;
    std::mutex m;
    std::atomic<bool> workdone(false);
    std::queue<int> numbers;

    //start threads
    std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
    std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));


    //wait for 3 seconds, then join the threads
    std::this_thread::sleep_for(std::chrono::seconds(1));
    workdone = true;
    cv.notify_all(); // To prevent dead-lock

    producer.join();
    consumer.join();

    //output the counters
    std::cout << producer_count << std::endl;
    std::cout << consumer_count << std::endl;

    return 0;
}

Solution 2

Note that this code may not work properly. the workdone variable is defined as a regular bool and it is perfectly legitimate for the compiler to assume that it can be safely optimized away because it never changes inside the block of code.

if you have a jerk reaction to just add volatile... Nope, that won't work either. You'll need to properly synchronize access to the workdone variable since both threads are reading, and another thread (the ui thread) is writing. An alternate solution would be to use something like an event instead of a simple variable.

But the explanation to your problem. Both threads have the same ending contition (!workdone), but they have a different duration, so there is currently nothing guaranteeing that producer and consumer are somehow synced to run at a similar amount of loops over time.

Share:
13,057

Related videos on Youtube

cmplx96
Author by

cmplx96

Updated on June 04, 2022

Comments

  • cmplx96
    cmplx96 almost 2 years
    #include <iostream>
    #include <thread>
    #include <condition_variable>
    #include <queue>
    #include <cstdlib>
    #include <chrono>
    #include <ctime>
    #include <random>
    
    using namespace std;
    
    //counts every number that is added to the queue
    static long long producer_count = 0;
    //counts every number that is taken out of the queue
    static long long consumer_count = 0;
    
    void generateNumbers(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone){
        while(!workdone) {
            unique_lock<std::mutex> lk(m);
            int rndNum = rand() % 100;
            numbers.push(rndNum);
            producer_count++;
            cv.notify_one();
         }
    }
    
    void work(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone) {
        while(!workdone) {
            unique_lock<std::mutex> lk(m);
            cv.wait(lk);
            cout << numbers.front() << endl;
            numbers.pop();
            consumer_count++;
    
         }
    }
    
    int main() {
        condition_variable cv;
        mutex m;
        bool workdone = false;
        queue<int> numbers;
    
        //start threads
        thread producer(generateNumbers, ref(numbers), ref(cv), ref(m),     ref(workdone));
        thread consumer(work, ref(numbers), ref(cv), ref(m), ref(workdone));
    
        //wait for 3 seconds, then join the threads
        this_thread::sleep_for(std::chrono::seconds(3));
        workdone = true;
    
        producer.join();
        consumer.join();
    
        //output the counters
        cout << producer_count << endl;
        cout << consumer_count << endl;
    
        return 0;
    }
    

    Hello Everyone, I tried to implement the Producer-Consumer-Pattern with C++. The producer thread generates random integers, adds them to a queue and then notifies the consumer thread that a new number was added.

    The consumer thread waits for the notification and then prints the first element of the queue to the console and deletes it.

    I incremented a counter for every number that is added to the queue and another counter for every number that is taken out of the queue.

    I expected the two counters to hold the same value after the program is finished, however the difference is huge. The counter that represents the addition to the queue is always in the million range (3871876 in my last test) and the counter that represents the consumer which takes numbers out of the queue is always below 100k (89993 in my last test).

    Can someone explain to me why there is such a huge difference? Do I have to add another condition variable so that the producer threads waits for the consumer thread as well? Thanks!

    • Admin
      Admin over 7 years
      Shouldn't workdone be atomic<bool> or such?
    • nwp
      nwp over 7 years
      Try while (!(workdone && numbers.empty())) to make the consumer continue until it is supposed to quit and the queue is empty. Maybe that is the behavior you want.
  • nwp
    nwp over 7 years
    While this fixes a potential deadlock, it does not make the consumer consume all the numbers which I believe is intended.
  • Jonas
    Jonas over 7 years
    @nwp It may be off-by-one every now and then, yes.
  • cmplx96
    cmplx96 over 7 years
    Thank you for the answer! I ran some tests and the counter have been holding the same value!
  • Jonas
    Jonas over 7 years
    Your welcome. The first solution will almost surely never enter dead-lock. Consider the events: worker notifies, generator gets lock, generator generates, generator notify, worker waits. If that were to happen (it almost surely won't), but then it would dead-lock. The second cares if even less likely... For definition of "almost surely" en.m.wikipedia.org/wiki/Almost_surely