classical producer consumer threading

11,733

Solution 1

In pseudo-code the producer is something like:

void producer_thread()
{
    while(true)
        queue.push( produce() );
}

so consider the queue push method (I've used pthreads here, but the same logic applies with other libraries)

void SynchronizedQueue::push(Item const &i)
{
    pthread_mutex_lock(&mutex);

    // queue is full, so wait for consumer
    while (queue.size() == BUFFER_SIZE)
        pthread_cond_wait(&condition, &mutex);

    // when we get here, the queue has space
    this->queue.push_back(i);

    // make sure we wake a sleeping consumer
    if (queue.size() == 1)
        pthread_cond_signal(&condition);

    pthread_mutex_unlock(&mutex);
}

and the pop method used by the consumer:

Item SynchronizedQueue::pop()
{
    pthread_mutex_lock(&mutex);

    // wait for something to do
    while (queue.size() == 0)
        pthread_cond_wait(&condition, &mutex);

    // if we get here, we have some work
    Item tmp = queue.front();

    // make sure we wake a sleeping producer
    if (queue.size() == BUFFER_SIZE)
        pthread_cond_signal(&condition)

    queue.pop_front();
    pthread_mutex_unlock(&mutex);
    return tmp;
}

Solution 2

You need conditional variables.

A typical usage of conditional variable is this:

//lock the mutex first!
scoped_lock myLock(myMutex); 

//wait till a condition is met
myConditionalVariable.wait(myLock, CheckCondition);

//Execute this code only if the condition is met

whereCheckCondition is a function (or functor) which checks the condition (as to when to wake up, for example). It is called by wait() function internally when it spuriously wakes up and if the condition has not met yet, the wait() function sleeps again. Before going to sleep, wait() releases the mutex, atomically.


If your compiler supports std::conditional introduced by C++11, then you can see this for detail:

If your compiler doesn't support it, and you work with win32 threads, then see this:

And here is a complete example.

And if you work with POSIX threads, then see this:


You can see my implementation of conditional_variable using win32 primitives here:

Scroll down and see it's implementation first, then see the usage in the concurrent queue implementation.

Share:
11,733
Dipro Sen
Author by

Dipro Sen

Updated on June 17, 2022

Comments

  • Dipro Sen
    Dipro Sen almost 2 years

    In the Classical producer consumer problem. producer sleeps when itemCount == BUFFER_SIZE amd wakes up again when it goes down. But once itemCount grows up, producer thread is put to sleep. how can it know that itemCount has gone down and it needs to wakeup ?