Dumping a multiprocessing.Queue into a list
Try this:
import Queue
import time
def dump_queue(queue):
"""
Empties all pending items in a queue and returns them in a list.
"""
result = []
for i in iter(queue.get, 'STOP'):
result.append(i)
time.sleep(.1)
return result
import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)
Multiprocessing queues have an internal buffer which has a feeder thread which pulls work off a buffer and flushes it to the pipe. If not all of the objects have been flushed, I could see a case where Empty is raised prematurely. Using a sentinel to indicate the end of the queue is safe (and reliable). Also, using the iter(get, sentinel) idiom is just better than relying on Empty.
I don't like that it could raise empty due to flushing timing (I added the time.sleep(.1) to allow a context switch to the feeder thread, you may not need it, it works without it - it's a habit to release the GIL).
Comments
-
Ram Rachum over 2 years
I wish to dump a
multiprocessing.Queue
into a list. For that task I've written the following function:import Queue def dump_queue(queue): """ Empties all pending items in a queue and returns them in a list. """ result = [] # START DEBUG CODE initial_size = queue.qsize() print("Queue has %s items initially." % initial_size) # END DEBUG CODE while True: try: thing = queue.get(block=False) result.append(thing) except Queue.Empty: # START DEBUG CODE current_size = queue.qsize() total_size = current_size + len(result) print("Dumping complete:") if current_size == initial_size: print("No items were added to the queue.") else: print("%s items were added to the queue." % \ (total_size - initial_size)) print("Extracted %s items from the queue, queue has %s items \ left" % (len(result), current_size)) # END DEBUG CODE return result
But for some reason it doesn't work.
Observe the following shell session:
>>> import multiprocessing >>> q = multiprocessing.Queue() >>> for i in range(100): ... q.put([range(200) for j in range(100)]) ... >>> q.qsize() 100 >>> l=dump_queue(q) Queue has 100 items initially. Dumping complete: 0 items were added to the queue. Extracted 1 items from the queue, queue has 99 items left >>> l=dump_queue(q) Queue has 99 items initially. Dumping complete: 0 items were added to the queue. Extracted 3 items from the queue, queue has 96 items left >>> l=dump_queue(q) Queue has 96 items initially. Dumping complete: 0 items were added to the queue. Extracted 1 items from the queue, queue has 95 items left >>>
What's happening here? Why aren't all the items being dumped?