python multiprocessing - process hangs on join for large queue

23,553

Solution 1

The qout queue in the subprocess gets full. The data you put in it from foo() doesn't fit in the buffer of the OS's pipes used internally, so the subprocess blocks trying to fit more data. But the parent process is not reading this data: it is simply blocked too, waiting for the subprocess to finish. This is a typical deadlock.

Solution 2

There must be a limit on the size of queues. Consider the following modification:

from multiprocessing import Process, Queue

def foo(qin,qout):
    while True:
        bar = qin.get()
        if bar is None:
            break
        #qout.put({'bar':bar})

if __name__=='__main__':
    import sys

    qin=Queue()
    qout=Queue()   ## POSITION 1
    for i in range(100):
        #qout=Queue()   ## POSITION 2
        worker=Process(target=foo,args=(qin,))
        worker.start()
        for j in range(1000):
            x=i*100+j
            print x
            sys.stdout.flush()
            qin.put(x**2)

        qin.put(None)
        worker.join()

    print 'Done!'

This works as-is (with qout.put line commented out). If you try to save all 100000 results, then qout becomes too large: if I uncomment out the qout.put({'bar':bar}) in foo, and leave the definition of qout in POSITION 1, the code hangs. If, however, I move qout definition to POSITION 2, then the script finishes.

So in short, you have to be careful that neither qin nor qout becomes too large. (See also: Multiprocessing Queue maxsize limit is 32767)

Solution 3

I had the same problem on python3 when tried to put strings into a queue of total size about 5000 cahrs.

In my project there was a host process that sets up a queue and starts subprocess, then joins. Afrer join host process reads form the queue. When subprocess producess too much data, host hungs on join. I fixed this using the following function to wait for subprocess in the host process:

from multiprocessing import Process, Queue
from queue import Empty

def yield_from_process(q: Queue, p: Process):
    while p.is_alive():
        p.join(timeout=1)
        while True:
            try:
                yield q.get(block=False)
            except Empty:
                break

I read from queue as soon as it fills so it never gets very large

Solution 4

I was trying to .get() an async worker after the pool had closed

indentation error outside of a with block

i had this

with multiprocessing.Pool() as pool:
    async_results = list()
    for job in jobs:
        async_results.append(
            pool.apply_async(
                _worker_func,
                (job,),
            )
        )
# wrong
for async_result in async_results:
    yield async_result.get()

i needed this

with multiprocessing.Pool() as pool:
    async_results = list()
    for job in jobs:
        async_results.append(
            pool.apply_async(
                _worker_func,
                (job,),
            )
        )
    # right
    for async_result in async_results:
        yield async_result.get()
Share:
23,553
user545424
Author by

user545424

Updated on May 20, 2020

Comments

  • user545424
    user545424 almost 4 years

    I'm running python 2.7.3 and I noticed the following strange behavior. Consider this minimal example:

    from multiprocessing import Process, Queue
    
    def foo(qin, qout):
        while True:
            bar = qin.get()
            if bar is None:
                break
            qout.put({'bar': bar})
    
    if __name__ == '__main__':
        import sys
    
        qin = Queue()
        qout = Queue()
        worker = Process(target=foo,args=(qin,qout))
        worker.start()
    
        for i in range(100000):
            print i
            sys.stdout.flush()
            qin.put(i**2)
    
        qin.put(None)
        worker.join()
    

    When I loop over 10,000 or more, my script hangs on worker.join(). It works fine when the loop only goes to 1,000.

    Any ideas?

  • Matteo
    Matteo over 6 years
    It'd be great if you also offered a code solution to the problem. I.e. how to empty the buffer so the subprocess does not block.
  • Cuhrazatee
    Cuhrazatee about 4 years
    What is Empty?
  • MSS
    MSS over 3 years
    Can you elaborate on this pls. I want to use the result from get() after the with block.
  • ThorSummoner
    ThorSummoner over 3 years
    @MSS, exiting the with block destroys the pool, meaning all the job results are deleted, so you can copy them into a variable/list that will exist outside the with block, or you can use them inside the with block (or dont use a with block at all, and close the pool manually when you are done)
  • Dilettant
    Dilettant almost 2 years
    @Cuhrazatee Empty is an import from queue and the docs state that: "Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty."