How do you pass a Queue reference to a function managed by pool.map_async()?

26,536

Solution 1

The following code seems to work:

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"


def main():
    manager = multiprocessing.Manager()
    q = manager.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

Note that the Queue is got from a manager.Queue() rather than multiprocessing.Queue(). Thanks Alex for pointing me in this direction.

Solution 2

Making q global works...:

import multiprocessing, time

q = multiprocessing.Queue()

def task(count):
    for i in xrange(count):
        q.put("%d mississippi" % i)
    return "Done"

def main():
    pool = multiprocessing.Pool()
    result = pool.map_async(task, range(10))
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

If you need multiple queues, e.g. to avoid mixing up the progress of the various pool processes, a global list of queues should work (of course, each process will then need to know what index in the list to use, but that's OK to pass as an argument;-).

Share:
26,536
David
Author by

David

Updated on July 10, 2022

Comments

  • David
    David almost 2 years

    I want a long-running process to return its progress over a Queue (or something similar) which I will feed to a progress bar dialog. I also need the result when the process is completed. A test example here fails with a RuntimeError: Queue objects should only be shared between processes through inheritance.

    import multiprocessing, time
    
    def task(args):
        count = args[0]
        queue = args[1]
        for i in xrange(count):
            queue.put("%d mississippi" % i)
        return "Done"
    
    def main():
        q = multiprocessing.Queue()
        pool = multiprocessing.Pool()
        result = pool.map_async(task, [(x, q) for x in range(10)])
        time.sleep(1)
        while not q.empty():
            print q.get()
        print result.get()
    
    if __name__ == "__main__":
        main()
    

    I've been able to get this to work using individual Process objects (where I am alowed to pass a Queue reference) but then I don't have a pool to manage the many processes I want to launch. Any advise on a better pattern for this?

  • David
    David almost 14 years
    Will this work if the "task" is defined in a different module or package? The example code is very simplified. The real program has an MVC architecture where a producer-consumer pipeline is set up across multiple cores (the model) and it needs to send progress updates to the wxPython GUI (the View).
  • Alex Martelli
    Alex Martelli almost 14 years
    @David, you can try; if your real code doesn't work in this simple way, you'll need to move up a notch in complexity and go for a Manager (which can give you proxies to Queues, etc).
  • David
    David almost 14 years
    This doesn't seem to work at all. q never returns anything q.empty() is always True on my machine. Even if I increase the sleep call to 10 seconds which should be excessive time for the task to put a few messages on the queue, q.empty always returns True.
  • Alex Martelli
    Alex Martelli almost 14 years
    @David, by "This", do you mean the code I posted in my A? Because that code works fine for me on a dual-core macbook with OSX 10.5, Python 2.6.5 or 2.7. What's your platform?
  • David
    David almost 14 years
    Yes, I copied the code you posted and while the results are returned (I get 10 "Done" in a list) nothing is ever returned by the Queue. The debugger shows that q always returns q.empty() == True. Windows 7, ActivePython 2.6.5.12
  • Alex Martelli
    Alex Martelli almost 14 years
    @David, it does work fine on the Mac, as I said (no Windows to check there). Ah well, then it looks like a Manager is the only viable approach for you.
  • David
    David almost 14 years
    @Alex Thanks. I'll look into Managers.
  • winwaed
    winwaed over 13 years
    +1 and Just a quick note that your question helped me in a problem I had today. I'd found the Manager version of the queue, but my code wasn't working because I was relying on a global. It needs to be passed as a parameter, like you are doing.
  • micsthepick
    micsthepick about 6 years
    There is a good reason that this works on macs and not windows. The default context for creating new processes in windows is spawn, as Fork is not available.