How do you pass a Queue reference to a function managed by pool.map_async()?
Solution 1
The following code seems to work:
import multiprocessing, time
def task(args):
count = args[0]
queue = args[1]
for i in xrange(count):
queue.put("%d mississippi" % i)
return "Done"
def main():
manager = multiprocessing.Manager()
q = manager.Queue()
pool = multiprocessing.Pool()
result = pool.map_async(task, [(x, q) for x in range(10)])
time.sleep(1)
while not q.empty():
print q.get()
print result.get()
if __name__ == "__main__":
main()
Note that the Queue is got from a manager.Queue() rather than multiprocessing.Queue(). Thanks Alex for pointing me in this direction.
Solution 2
Making q
global works...:
import multiprocessing, time
q = multiprocessing.Queue()
def task(count):
for i in xrange(count):
q.put("%d mississippi" % i)
return "Done"
def main():
pool = multiprocessing.Pool()
result = pool.map_async(task, range(10))
time.sleep(1)
while not q.empty():
print q.get()
print result.get()
if __name__ == "__main__":
main()
If you need multiple queues, e.g. to avoid mixing up the progress of the various pool processes, a global list of queues should work (of course, each process will then need to know what index in the list to use, but that's OK to pass as an argument;-).
David
Updated on July 10, 2022Comments
-
David almost 2 years
I want a long-running process to return its progress over a Queue (or something similar) which I will feed to a progress bar dialog. I also need the result when the process is completed. A test example here fails with a
RuntimeError: Queue objects should only be shared between processes through inheritance
.import multiprocessing, time def task(args): count = args[0] queue = args[1] for i in xrange(count): queue.put("%d mississippi" % i) return "Done" def main(): q = multiprocessing.Queue() pool = multiprocessing.Pool() result = pool.map_async(task, [(x, q) for x in range(10)]) time.sleep(1) while not q.empty(): print q.get() print result.get() if __name__ == "__main__": main()
I've been able to get this to work using individual Process objects (where I am alowed to pass a Queue reference) but then I don't have a pool to manage the many processes I want to launch. Any advise on a better pattern for this?
-
David almost 14 yearsWill this work if the "task" is defined in a different module or package? The example code is very simplified. The real program has an MVC architecture where a producer-consumer pipeline is set up across multiple cores (the model) and it needs to send progress updates to the wxPython GUI (the View).
-
Alex Martelli almost 14 years@David, you can try; if your real code doesn't work in this simple way, you'll need to move up a notch in complexity and go for a Manager (which can give you proxies to Queues, etc).
-
David almost 14 yearsThis doesn't seem to work at all. q never returns anything q.empty() is always True on my machine. Even if I increase the sleep call to 10 seconds which should be excessive time for the task to put a few messages on the queue, q.empty always returns True.
-
Alex Martelli almost 14 years@David, by "This", do you mean the code I posted in my A? Because that code works fine for me on a dual-core macbook with OSX 10.5, Python 2.6.5 or 2.7. What's your platform?
-
David almost 14 yearsYes, I copied the code you posted and while the results are returned (I get 10 "Done" in a list) nothing is ever returned by the Queue. The debugger shows that q always returns q.empty() == True. Windows 7, ActivePython 2.6.5.12
-
Alex Martelli almost 14 years@David, it does work fine on the Mac, as I said (no Windows to check there). Ah well, then it looks like a Manager is the only viable approach for you.
-
David almost 14 years@Alex Thanks. I'll look into Managers.
-
winwaed over 13 years+1 and Just a quick note that your question helped me in a problem I had today. I'd found the Manager version of the queue, but my code wasn't working because I was relying on a global. It needs to be passed as a parameter, like you are doing.
-
micsthepick about 6 yearsThere is a good reason that this works on macs and not windows. The default context for creating new processes in windows is spawn, as Fork is not available.