Dumping a multiprocessing.Queue into a list

23,260

Try this:

import Queue
import time

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    for i in iter(queue.get, 'STOP'):
        result.append(i)
    time.sleep(.1)
    return result

import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
    q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)

Multiprocessing queues have an internal buffer which has a feeder thread which pulls work off a buffer and flushes it to the pipe. If not all of the objects have been flushed, I could see a case where Empty is raised prematurely. Using a sentinel to indicate the end of the queue is safe (and reliable). Also, using the iter(get, sentinel) idiom is just better than relying on Empty.

I don't like that it could raise empty due to flushing timing (I added the time.sleep(.1) to allow a context switch to the feeder thread, you may not need it, it works without it - it's a habit to release the GIL).

Share:
23,260
Ram Rachum
Author by

Ram Rachum

Israeli Python developer.

Updated on September 09, 2021

Comments

  • Ram Rachum
    Ram Rachum over 2 years

    I wish to dump a multiprocessing.Queue into a list. For that task I've written the following function:

    import Queue
    
    def dump_queue(queue):
        """
        Empties all pending items in a queue and returns them in a list.
        """
        result = []
    
        # START DEBUG CODE
        initial_size = queue.qsize()
        print("Queue has %s items initially." % initial_size)
        # END DEBUG CODE
    
        while True:
            try:
                thing = queue.get(block=False)
                result.append(thing)
            except Queue.Empty:
    
                # START DEBUG CODE
                current_size = queue.qsize()
                total_size = current_size + len(result)
                print("Dumping complete:")
                if current_size == initial_size:
                    print("No items were added to the queue.")
                else:
                    print("%s items were added to the queue." % \
                          (total_size - initial_size))
                print("Extracted %s items from the queue, queue has %s items \
                left" % (len(result), current_size))
                # END DEBUG CODE
    
                return result
    

    But for some reason it doesn't work.

    Observe the following shell session:

    >>> import multiprocessing
    >>> q = multiprocessing.Queue()
    >>> for i in range(100):
    ...     q.put([range(200) for j in range(100)])
    ... 
    >>> q.qsize()
    100
    >>> l=dump_queue(q)
    Queue has 100 items initially.
    Dumping complete:
    0 items were added to the queue.
    Extracted 1 items from the queue, queue has 99 items left
    >>> l=dump_queue(q)
    Queue has 99 items initially.
    Dumping complete:
    0 items were added to the queue.
    Extracted 3 items from the queue, queue has 96 items left
    >>> l=dump_queue(q)
    Queue has 96 items initially.
    Dumping complete:
    0 items were added to the queue.
    Extracted 1 items from the queue, queue has 95 items left
    >>> 
    

    What's happening here? Why aren't all the items being dumped?