Thread condition variables: un-acquired lock

16,990

Conditions are a wrapper around an underlying Lock that provide wait/notify functionality. You need to acquire a lock before you can release it - which wait does under the hood. Notably once it gets re-awoken, it reacquires the lock. Thus mutual exclusion is ensured between acquiring and releasing, with wait "yielding" control of the lock, if that makes sense.

Instead of doing the acquiring/releasing manually, just use the Condition as a context manager:

def consumer(cond):
    with cond:
        cond.wait()

    logging.debug('Resource is available to consumer')

If for whatever reason you're stuck on a version of python without context managers, this is equivalent to:

def consumer(cond):
    try:
        cond.acquire()
        cond.wait()
    finally:
        cond.release()

    logging.debug('Resource is available to consumer')

Often you want to make sure that only one consumer gets awoken, so the following idiom is frequently used:

with cond:
    while some_queue.isEmpty():
        cond.wait()
    #get one from queue

Thus you could notify any number of consumers and the extra ones just go immediately back to sleep once the Queue is empty.

Share:
16,990

Related videos on Youtube

cpx
Author by

cpx

Updated on July 04, 2022

Comments

  • cpx
    cpx almost 2 years

    I have this example in Python which demonstrates the use of condition variables.

    import logging
    import threading
    import time
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s',)
    
    def consumer(cond):
    
        # wait for the condition and use the resource
    
        logging.debug('Starting consumer thread')
    
        t = threading.currentThread()
    
        cond.wait()
    
        logging.debug('Resource is available to consumer')
    
    def producer(cond):
    
        # set up the resource to be used by the consumer
    
        logging.debug('Starting producer thread')
    
        logging.debug('Making resource available')
    
        cond.notifyAll()
    
    
    condition = threading.Condition()
    
    # pass each thread a 'condition'
    c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
    c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
    p = threading.Thread(name='p', target=producer, args=(condition,))
    
    
    # start two threads and put them into 'wait' state
    c1.start()
    c2.start()
    
    # after two seconds or after some operation notify them to free or step over the wait() function
    time.sleep(2)
    p.start()
    

    However, it raises a runtime error un-acquired lock on threads. I have an idea that I need to use acquire and release functions but I'm not sure about their usage and what exactly they do.