Using python multiprocessing pipes

10,391

Yep, that is surprising behaviour indeed.

However, if you look at the output of lsof for the two parallel child processes it is easy to notice that the second child process has more file descriptors open.

What happens is that when two parallel child processes get started the second child inherits the pipes of the parent, so that when the parent calls self.parent_conn.close() the second child still has that pipe file descriptor open, so that the pipe file description doesn't get closed in the kernel (the reference count is more than 0), with the effect being that self.child_conn.recv_bytes() in the first parallel child process never read()s EOF and EOFError gets never thrown.

You may need to send an explicit shutdown message, rather then just closing file descriptors because there seem to be little control over what file descriptors get shared between which processes (there is no close-on-fork file descriptor flag).

Share:
10,391
Mats Ekberg
Author by

Mats Ekberg

Updated on June 04, 2022

Comments

  • Mats Ekberg
    Mats Ekberg about 2 years

    I am trying to write a class that will calculate checksums using multiple processes, thereby taking advantage of multiple cores. I have a quite simple class for this, and it works great when executing a simple case. But whenever I create two or more instances of the class, the worker never exits. It seems like it never get the message that the pipe has been closed by the parent.

    All the code can be found below. I first calculate the md5 and sha1 checksums separately, which works, and then I try to perform the calculation in parallel, and then the program locks up when it is time to close the pipe.

    What is going on here? Why aren't the pipes working as I expect? I guess I could do a workaround by sending a "Stop" message on the queue and make the child quit that way, but I'd really like to know why this isn't working as it is.

    import multiprocessing
    import hashlib
    
    class ChecksumPipe(multiprocessing.Process):
        def __init__(self, csname):
            multiprocessing.Process.__init__(self, name = csname)
            self.summer = eval("hashlib.%s()" % csname)
            self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
            self.result_queue = multiprocessing.Queue(1)
            self.daemon = True
            self.start()
            self.child_conn.close() # This is the parent. Close the unused end.
    
        def run(self):
            self.parent_conn.close() # This is the child. Close unused end.
            while True:
                try:
                    print "Waiting for more data...", self
                    block = self.child_conn.recv_bytes()
                    print "Got some data...", self
                except EOFError:
                    print "Finished work", self
                    break
                self.summer.update(block)
            self.result_queue.put(self.summer.hexdigest())
            self.result_queue.close()
            self.child_conn.close()
    
        def update(self, block):
            self.parent_conn.send_bytes(block)
    
        def hexdigest(self):
            self.parent_conn.close()
            return self.result_queue.get()
    
    
    def main():
        # Calculating the first checksum works
        md5 = ChecksumPipe("md5")
        md5.update("hello")
        print "md5 is", md5.hexdigest()
    
        # Calculating the second checksum works
        sha1 = ChecksumPipe("sha1")
        sha1.update("hello")
        print "sha1 is", sha1.hexdigest()
    
        # Calculating both checksums in parallel causes a lockup!
        md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
        md5.update("hello")
        sha1.update("hello")
        print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!
    
    main()
    

    PS. This problem has been solved Here is a working version of the above code if anyone is interested:

    import multiprocessing
    import hashlib
    
    class ChecksumPipe(multiprocessing.Process):
    
        all_open_parent_conns = []
    
        def __init__(self, csname):
            multiprocessing.Process.__init__(self, name = csname)
            self.summer = eval("hashlib.%s()" % csname)
            self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
            ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
            self.result_queue = multiprocessing.Queue(1)
            self.daemon = True
            self.start()
            self.child_conn.close() # This is the parent. Close the unused end.
    
        def run(self):
            for conn in ChecksumPipe.all_open_parent_conns:
                conn.close() # This is the child. Close unused ends.
            while True:
                try:
                    print "Waiting for more data...", self
                    block = self.child_conn.recv_bytes()
                    print "Got some data...", self
                except EOFError:
                    print "Finished work", self
                    break
                self.summer.update(block)
            self.result_queue.put(self.summer.hexdigest())
            self.result_queue.close()
            self.child_conn.close()
    
        def update(self, block):
            self.parent_conn.send_bytes(block)
    
        def hexdigest(self):
            self.parent_conn.close()
            return self.result_queue.get()
    
    def main():
        # Calculating the first checksum works
        md5 = ChecksumPipe("md5")
        md5.update("hello")
        print "md5 is", md5.hexdigest()
    
        # Calculating the second checksum works
        sha1 = ChecksumPipe("sha1")
        sha1.update("hello")
        print "sha1 is", sha1.hexdigest()
    
        # Calculating both checksums also works fine now
        md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
        md5.update("hello")
        sha1.update("hello")
        print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()
    
    main()
    
  • Mats Ekberg
    Mats Ekberg over 12 years
    Thanks! That cleared things up for me. I solved it in my example by using a shared class variable containing all open connections in all instances, so that the children can close all sockets they don't need.