How to solve the producer-consumer using semaphores?

15,441

Solution 1

It seems that you are using a mutex not a semaphore?

In using a mutex you have only binary synchronisation - locking and unlocking one resource. Sempahores have a value that you can signal or acquire.

You are trying to lock/unlock the entire buffer but that is the wrong way to go because, as you are seeing, either the producer or consumer locks, and when the reader has locked it the producer can't fill the buffer (because it has to lock first).

You should instead create a Sempahore, then when the producer writes one packet or block of data it can signal the semaphore. The consumers can then be trying to acquire the semaphore so they will be waiting until the producer has signalled a packet has been written. Upon signalling a written packet, one of the consumers will be woken and it will know it can read one packet. It can read a packet, then go back to trying to acquire on the semaphore. If in that time the producer has written another packet it has signalled again and either of the consumers will then go on to read another packet. Etc...

For example:

(Producer) - Write one packet - Semaphore.release(1)

(Consumer xN) - Semaphore.acquire(1) - Read one packet

If you have multiple consumers then the consumers (not the producer) should lock the buffer when reading the packet (but not when acquiring the semaphore) to prevent race conditions. In the example below the producer also locks the list since everything is on the same JVM.

import java.util.LinkedList;
import java.util.concurrent.Semaphore;

public class Semaphores {

    static Object LOCK = new Object();

    static LinkedList list = new LinkedList();
    static Semaphore sem = new Semaphore(0);
    static Semaphore mutex = new Semaphore(1);

    static class Consumer extends Thread {
        String name;
        public Consumer(String name) {
            this.name = name;
        }
        public void run() {
            try {

                while (true) {
                    sem.acquire(1);
                    mutex.acquire();
                    System.out.println("Consumer \""+name+"\" read: "+list.removeFirst());
                    mutex.release();
                }
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    }

    static class Producer extends Thread {
        public void run() {
            try {

                int N = 0;

                while (true) {
                    mutex.acquire();
                    list.add(new Integer(N++));
                    mutex.release();
                    sem.release(1);
                    Thread.sleep(500);
                }
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    }

    public static void main(String [] args) {
        new Producer().start();
        new Consumer("Alice").start();
        new Consumer("Bob").start();
    }
}

Solution 2

One of the most common usage pattern of Multi threaded application is to create an asynchronous communication network. Several real world applications require this. There are 2 ways of achieving this :-

  1. The producer and consumer are tightly coupled. This is not asynchronous and each producer waits for a consumer and vice versa. The throughput of the application also becomes the minimum of the 2 entities. This is generally never a good design.
  2. The better (and more complicated) way of doing this is by introducing a shared buffer between the producer and consumer. This way, a faster producer or faster consumer are not throttled due to a slower counterpart. It also allows for multiple producers and multiple consumers to connect via the shared buffer.

enter image description here

There are several ways to create a Producer-Consumer pattern.

  1. Using wait/notify/nofityAll which was covered in the earlier module on "Locking Fundamentals"
  2. Using the API provided by Java - java.util.concurrent.BlockingQueue. We will cover more on this in a subsequent module.
  3. Using Semaphores : This is a very convenient way of creating the producer-consumer pattern.

    public class ProducerConsumerSemaphore {
    
    private static final int BUFFER_SIZE = 10;
    private static final int MAX_VALUE = 10000;
    private final Stack<Integer> buffer = new Stack<Integer>();
    private final Semaphore writePermits = new Semaphore(BUFFER_SIZE);
    private final Semaphore readPermits = new Semaphore(0);
    private final Random random = new Random();
    
    class Producer implements Runnable {
        @Override
        public void run() {
            while (true) {
                writePermits.acquireUninterruptibly();
                buffer.push(random.nextInt(MAX_VALUE));
                readPermits.release();
            }
        }
    }
    
    class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                readPermits.acquireUninterruptibly();
                System.out.println(buffer.pop());
                writePermits.release();
            }
        }
    }
    
    public static void main(String[] args) {
    
        ProducerConsumerSemaphore obj = new ProducerConsumerSemaphore();
        Producer p1 = obj.new Producer();
        Producer p2 = obj.new Producer();
        Producer p3 = obj.new Producer();
        Consumer c1 = obj.new Consumer();
        Consumer c2 = obj.new Consumer();
        Consumer c3 = obj.new Consumer();
        Thread t1 = new Thread(p1);
        Thread t2 = new Thread(p2);
        Thread t3 = new Thread(p3);
        Thread t4 = new Thread(c1);
        Thread t5 = new Thread(c2);
        Thread t6 = new Thread(c3);
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
        t6.start();
    }
    

We use 2 semaphores - 1 for consumers and 1 for producers.

The number of permits allowed for the producer are set to maximum buffer size.

Each producer consumes 1 write permit and releases 1 read permit on producing 1 message.

Each consumer consumes 1 read permit and releases 1 write permit for consumption of each message.

Imagine the permit to be piggy banked on the actual message. Write permit flows from the Producer to Consumer (and back to the Producer). Read permit flows from the Consumer to Producer (and back to the Consumer). Total messages in the buffer at any given point of time will be exactly equal to the number of read permits issued. If the rate of producing messages is greater than the rate of consuming messages, then at a certain point, number of write permits available would be exhausted and all the producer threads would be blocked until a consumer reads from the buffer and releases a write permit. The same logic exists the other way round as well.

enter image description here

Above is a more visual articulation of flow of messages and permits in the system. By using Semaphores, we are only abstracting away the gory details and care required to write a piece of code using wait/notify/notifyAll. The above code can be compared with the wait et. al approach :

When a thread is blocked for lack of permits, it is equivalent to a wait() call on that semaphore.

When a thread releases a permit, it is equivalent to a notifyAll() call on that particular semaphore.

Share:
15,441
Victor
Author by

Victor

Coding for fun. GERALD WEINBERG IS THE BEST!!!!!! PSYCHOLOGY OF COMPUTER PROGRAMMING IS THE BEST BOOK EVER PUBLISHED!!!

Updated on November 21, 2022

Comments

  • Victor
    Victor over 1 year

    I need to code a problem similar to producer-consumer, that must use semaphores. I tried a couple of solutions and none of them worked. First I tried a solution on Wikipedia and it didn't worked. My current code is something like that:

    Method run of the consumer:

        public void run() {
        int i=0;
        DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        String s = new String();
        while (1!=2){
            Date datainicio = new Date();
            String inicio=dateFormat.format(datainicio);
            try {
                Thread.sleep(1000);///10000
            } catch (InterruptedException e) {
                System.out.println("Excecao InterruptedException lancada.");
            }
            //this.encheBuffer.down();
            this.mutex.down();
            // RC
            i=0;
            while (i<buffer.length) {
                if (buffer[i] == null) {
                    i++;
                } else {
                    break;
                }
            }
            if (i<buffer.length) {
                QuantidadeBuffer.quantidade--;
                Date datafim = new Date();
                String fim=dateFormat.format(datafim);
                int identificador;
                identificador=buffer[i].getIdentificador()[0];
                s="Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i;
                //System.out.println("Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
                buffer[i]= null;
            }
            // RC
            this.mutex.up();
            //this.esvaziaBuffer.up();
            System.out.println(s);
      //            lock.up();
        }
    }
    

    Method run of the producer:

        public void run() {
        DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        int i=0;
        while (1!=2){
            Date datainicio = new Date();
            String inicio=dateFormat.format(datainicio);
            // Produz Item
            try {
                Thread.sleep(500);//50000
            } catch (InterruptedException e) {
                System.out.println("Excecao InterruptedException lancada.");
            }
            //this.esvaziaBuffer.down();
            this.mutex.down();
            // RC
            i=0;
            while (i<buffer.length) {
                if (buffer[i]!=null) {
                    i++;
                } else {
                    break;
                }
            }
            if (i<buffer.length) {
                int identificador[]=new int[Pedido.getTamanho_identificador()];
                identificador[0]=i;
                buffer[i]=new Pedido();
                Produtor.buffer[i].setIdentificador(identificador);
                Produtor.buffer[i].setTexto("pacote de dados");
                QuantidadeBuffer.quantidade++;
                Date datafim = new Date();
                String fim=dateFormat.format(datafim);
                System.out.println("Produtor Thread: "+Thread.currentThread()+" Pedido: "+identificador[0]+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
                i++;
            }
            // RC
            this.mutex.up();
            //this.encheBuffer.up();
        }
        //this.encheBuffer.up();
    }
    

    In the above code it happened of a consumer thread to read a position and then, another thread read the same position without a producer fill that position, something like this:

    Consumidor Thread: Thread[Thread-17,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1
    Consumidor Thread: Thread[Thread-19,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1
    
    • PhD
      PhD over 12 years
      Did you try implementing this using wait and notify methods in Java? It'd make your life A LOT easier
    • Victor
      Victor over 12 years
      Yes I did that in the semaphore class.
    • Martin James
      Martin James over 12 years
      @Nupul - I find semaphore queues easier to understand - no weird 'wait' inside a lock, (yes, I know it releases the lock but it looks strange), no anti-spurious-wakeup loop. The producer pushes and then signals, the consumer waits and then pops. No need to look at the queue count at all.
  • AntonyM
    AntonyM over 12 years
    Sure, added example code to my answer. The output will be that the consumers will print out as and when they get packets (ever 500msec determined by the producer - note that there are no sleeps in the consumers).
  • AntonyM
    AntonyM over 12 years
    The above example uses a LinkedList to make the example clear but you can of course use a finite array and then use the nulling method you use in your code in substitute of it.
  • Voo
    Voo over 12 years
    Using synchronized seems like a bit of cheating although I can understand the argument. But just as a reminder for @Victor You can replace the synchronized statements with an additional semaphore quite easily. And then you could think about how you'd do this for limited data structures (i.e. you only have 10 storage spaces for the producer and can't add an 11th one).
  • AntonyM
    AntonyM over 12 years
    I've edited this to use semaphores rather than synchronized. Note that the semaphore to be used as a mutex has to be initialised to 1 to ensure that at most 1 thread may hold the lock (like a mutex).