Using std::condition_variable with atomic<bool>

10,779

Solution 1

I looked through your code manipulating conditional variable and atomic, and it seems that it is correct and won't cause problems.

Why you should protect writes to shared variable even if it is atomic:

There could be problems if write to shared variable happens between checking it in predicate and waiting on condition. Consider following:

  1. Waiting thread wakes spuriously, aquires mutex, checks predicate and evaluates it to false, so it must wait on cv again.

  2. Controlling thread sets shared variable to true.

  3. Controlling thread sends notification, which is not received by anybody, because there is no threads waiting on conditional variable.

  4. Waiting thread waits on conditional variable. Since notification was already sent, it would wait until next spurious wakeup, or next time when controlling thread sends notification. Potentially waiting indefinetly.

Reads from shared atomic variables without locking is generally safe, unless it introduces TOCTOU problems.

In your case you are reading shared variable to avoid unnecessary locking and then checking it again after lock (in conditional wait call). It is a valid optimisation, called double-checked locking and I do not see any potential problems here.

You might want to check if atomic<bool> is lock-free. Otherwise you will have even more locks you would have without it.

Solution 2

In general, you want to treat the fact that variable is atomic independently of how it works with a condition variable.

If all code that interacts with the condition variable follows the usual pattern of locking the mutex before query/modification, and the code interacting with the condition variable does not rely on code that does not interact with the condition variable, it will continue to be correct even if it wraps an atomic mutex.

From a quick read of your pseudo-code, this appears to be correct. However, pseudo-code is often a poor substitute for real code for multi-threaded code.

The "optimization" of only waiting on the condition variable (and locking the mutex) when an atomic read says you might want to may or may not be an optimization. You need to profile throughput.

Share:
10,779

Related videos on Youtube

Erik Alapää
Author by

Erik Alapää

LinkedIn: https://www.linkedin.com/in/alapaa/

Updated on October 20, 2022

Comments

  • Erik Alapää
    Erik Alapää about 1 year

    There are several questions on SO dealing with atomic, and other that deal with std::condition_variable. But my question if my use below is correct?

    Three threads, one ctrl thread that does preparation work before unpausing the two other threads. The ctrl thread also is able to pause the worker threads (sender/receiver) while they are in their tight send/receive loops. The idea with using the atomic is to make the tight loops faster in case the boolean for pausing is not set.

    class SomeClass
    {
    
    public:
        //...                                                                                                                                                                                                                                                                                                                                                                                   
        // Disregard that data is public...                                                                                                                                                                                                                                                                                                                                                     
    
        std::condition_variable cv; // UDP threads will wait on this cv until allowed                                                                                                                                                                                                                                                                                                           
                                    // to run by ctrl thread.                                                                                                                                                                                                                                                                                                                                   
        std::mutex cv_m;
        std::atomic<bool> pause_test_threads;
    };
    
    void do_pause_test_threads(SomeClass *someclass)
    {
        if (!someclass->pause_test_threads)
        {
            // Even though we use an atomic, mutex must be held during                                                                                                                                                                                                                                                                                                                          
            // modification. See documentation of condition variable                                                                                                                                                                                                                                                                                                                            
            // notify_all/wait. Mutex does not need to be held for the actual                                                                                                                                                                                                                                                                                                                   
            // notify call.                                                                                                                                                                                                                                                                                                                                                                     
            std::lock_guard<std::mutex> lk(someclass->cv_m);
            someclass->pause_test_threads = true;
        }
    }
    
    void unpause_test_threads(SomeClass *someclass)
    {
        if (someclass->pause_test_threads)
        {
            {
                // Even though we use an atomic, mutex must be held during                                                                                                                                                                                                                                                                                                                      
                // modification. See documentation of condition variable                                                                                                                                                                                                                                                                                                                        
                // notify_all/wait. Mutex does not need to be held for the actual                                                                                                                                                                                                                                                                                                               
                // notify call.                                                                                                                                                                                                                                                                                                                                                                 
                std::lock_guard<std::mutex> lk(someclass->cv_m);
                someclass->pause_test_threads = false;
            }
            someclass->cv.notify_all(); // Allow send/receive threads to run.                                                                                                                                                                                                                                                                                                                   
        }
    }
    
    void wait_to_start(SomeClass *someclass)
    {
        std::unique_lock<std::mutex> lk(someclass->cv_m); // RAII, no need for unlock.                                                                                                                                                                                                                                                                                                          
        auto not_paused = [someclass](){return someclass->pause_test_threads == false;};
        someclass->cv.wait(lk, not_paused);
    }
    
    void ctrl_thread(SomeClass *someclass)
    {
        // Do startup work                                                                                                                                                                                                                                                                                                                                                                      
        // ...                                                                                                                                                                                                                                                                                                                                                                                  
        unpause_test_threads(someclass);
    
        for (;;)
        {
            // ... check for end-program etc, if so, break;                                                                                                                                                                                                                                                                                                                                     
            if (lost ctrl connection to other endpoint)
            {
                pause_test_threads();
            }
            else
            {
                unpause_test_threads();
            }
            sleep(SLEEP_INTERVAL);
    
        }
    
        unpause_test_threads(someclass);
    }
    
    void sender_thread(SomeClass *someclass)
    {
        wait_to_start(someclass);
        ...
        for (;;)
        {
            // ... check for end-program etc, if so, break;                                                                                                                                                                                                                                                                                                                                     
            if (someclass->pause_test_threads) wait_to_start(someclass);
            ...
        }
    }
    
    void receiver_thread(SomeClass *someclass)
    {
        wait_to_start(someclass);
        ...
        for (;;)
        {
            // ... check for end-program etc, if so, break;                                                                                                                                                                                                                                                                                                                                     
            if (someclass->pause_test_threads) wait_to_start(someclass);
            ...
        }
    
  • Erik Alapää
    Erik Alapää over 7 years
    In general, yes, but in this case, no. Check the documentation for usage of C++11 condition variable, see en.cppreference.com/w/cpp/thread/condition_variable
  • Revolver_Ocelot
    Revolver_Ocelot over 7 years
    It is not a problem of simultaneous change (assigment to atomic cannot cause data race). Requirement for mutex is because if spurious wakeup and threads execution interleaves just right, waiting thread might miss the notification and enter infinite wait.