Why are Python multiprocessing Pipe unsafe?

11,399

Solution 1

Essentially, the problem is that Pipe is a thin wrapper around a platform-defined pipe object. recv simply repeatedly receives a buffer of bytes until a complete Python object is obtained. If two threads or processes use recv on the same pipe, the reads may interleave, leaving each process with half a pickled object and thus corrupting the data. Queues do proper synchronization between processes, at the expense of more complexity.

As the multiprocessing documentation puts it:

Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

You don't have to endlessly send poison pills; one per worker is all you need. Each worker picks up exactly one poison pill before exiting, so there's no danger that a worker will somehow miss the message.

You should also consider using multiprocessing.Pool instead of reimplementing the "worker process" model -- Pool has a lot of methods which make distributing work across multiple threads very easy.

Solution 2

I don't understand why Pipes are said unsafe when there are multiple senders and receivers.

Consider you put water into a pipe from source A and B simultaneously. On the other end of the pipe, it will be impossible for you to find out which part of the water came from A or B, right? :)

A pipe transports a data stream on the byte level. Without a communication protocol on top of it, it does not know what a message is and therefore can't ensure message integrity. Therefore, it is not only 'unsafe' to use pipes with multiple senders. It is a major design flaw and will most likely lead to communication problems.

Queues, however, are implemented on a higher level. They are designed for communicating messages (or even abstract objects). Queues are made for keeping a message/object self-contained. Multiple sources can put objects into a queue and multiple consumers can pull these objects while being 100 % sure that whatever got into the queue as a unit also comes out of it as a unit.

Edit after quite a while:

I should add that in the byte stream, all bytes are retrieved in the same order as sent (guaranteed). The issue with multiple senders is that the sending order (the order of input) might already be unclear or random, i.e. multiple streams might mix in an unpredictable fashion.

A common queue implementation guarantees that single messages are kept intact, even if there are multiple senders. Messages are retrieved in the order as sent, too. With multiple competing senders and without further synchronization mechanisms there is, however, again no guarantee about the order of input messages.

Share:
11,399
ElBidoule
Author by

ElBidoule

Updated on June 25, 2022

Comments

  • ElBidoule
    ElBidoule about 2 years

    I don't understand why Pipes are said unsafe when there are multiple senders and receivers.

    How the following code can be turned into code using Queues if this is the case ? Queues don't throw EOFError when closed, so my processes can't stop. Should I send endlessly 'Poison' messages to tell them to stop (this way, i'm sure all my processes receive at least one poison) ?

    I would like to keep the pipe p1 open until I decide otherwise (here it's when I have sent the 10 messages).


    from multiprocessing import Pipe, Process
    from random import randint, random
    from time import sleep
    
    def job(name, p_in, p_out):
        print(name + ' starting')
        nb_msg = 0
        try:
            while True:
                x = p_in.recv()
                print(name + ' receives ' + x)
                nb_msg = nb_msg + 1
                p_out.send(x)
                sleep(random())
        except EOFError:
            pass
        print(name + ' ending ... ' + str(nb_msg) + ' message(s)')
    
    if __name__ == '__main__':
        p1_in, p1_out = Pipe()
        p2_in, p2_out = Pipe()
    
        proc = []
    
        for i in range(3):
            p = Process(target=job, args=(str(i), p1_out, p2_in))
            p.start()
            proc.append(p)
    
        for x in range(10):
            p1_in.send(chr(97+x))
        p1_in.close()
        for p in proc:
            p.join()
        p1_out.close()
        p2_in.close()
    
        try:
            while True:
                print(p2_out.recv())
        except EOFError:
            pass
    
        p2_out.close()
    
  • Daniel
    Daniel about 8 years
    What if I use multiprocessing.Lock() when using recv and send of a pipe? Will it become safe (and efficient)?
  • nneonneo
    nneonneo about 8 years
    If you do that, you'll basically end up with a Queue - multiprocessing.Queue is a Pipe with a pair of locks attached (one for each direction). So, it would be safe and reasonably efficient, but you'd also be directly reinventing the wheel - why not just use Queue?