Maximum size for multiprocessing.Queue item?
Solution 1
Seems the underlying pipe is full, so the feeder thread blocks on the write to the pipe (actually when trying to acquire the lock protecting the pipe from concurrent access).
Check this issue http://bugs.python.org/issue8237
Solution 2
By default maxsize of Queue is infinite, but you have over-ridden that. In your case, worker_p is putting item in the queue, the queue should be freed up before calling join. Please refer below link for details. https://docs.python.org/2/library/multiprocessing.html#programming-guidelines
Solution 3
The answer to python multiprocessing: some functions do not return when they are complete (queue material too big) implements what you probably mean by "dequeuing" before joining" in a parallel execution of an arbitrary set of functions, whose return values get queued.
This therefore allows any size of stuff to get put into the queues, so that the limit you found does not get in the way.
Brendan Wood
Updated on July 10, 2022Comments
-
Brendan Wood almost 2 years
I'm working on a fairly large project in Python that requires one of the compute-intensive background tasks to be offloaded to another core, so that the main service isn't slowed down. I've come across some apparently strange behaviour when using
multiprocessing.Queue
to communicate results from the worker process. Using the same queue for both athreading.Thread
and amultiprocessing.Process
for comparison purposes, the thread works just fine but the process fails to join after putting a large item in the queue. Observe:import threading import multiprocessing class WorkerThread(threading.Thread): def __init__(self, queue, size): threading.Thread.__init__(self) self.queue = queue self.size = size def run(self): self.queue.put(range(size)) class WorkerProcess(multiprocessing.Process): def __init__(self, queue, size): multiprocessing.Process.__init__(self) self.queue = queue self.size = size def run(self): self.queue.put(range(size)) if __name__ == "__main__": size = 100000 queue = multiprocessing.Queue() worker_t = WorkerThread(queue, size) worker_p = WorkerProcess(queue, size) worker_t.start() worker_t.join() print 'thread results length:', len(queue.get()) worker_p.start() worker_p.join() print 'process results length:', len(queue.get())
I've seen that this works fine for
size = 10000
, but hangs atworker_p.join()
forsize = 100000
. Is there some inherent size limit to whatmultiprocessing.Process
instances can put in amultiprocessing.Queue
? Or am I making some obvious, fundamental mistake here?For reference, I am using Python 2.6.5 on Ubuntu 10.04.