Get all items from thread Queue

31,886

Solution 1

I'd be very surprised if the get_nowait() call caused the pause by not returning if the list was empty.

Could it be that you're posting a large number of (maybe big?) items between checks which means the receiving thread has a large amount of data to pull out of the Queue? You could try limiting the number you retrieve in one batch:

def queue_get_all(q):
    items = []
    maxItemsToRetrieve = 10
    for numOfItemsRetrieved in range(0, maxItemsToRetrieve):
        try:
            if numOfItemsRetrieved == maxItemsToRetrieve:
                break
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

This would limit the receiving thread to pulling up to 10 items at a time.

Solution 2

If you're always pulling all available items off the queue, is there any real point in using a queue, rather than just a list with a lock? ie:

from __future__ import with_statement
import threading

class ItemStore(object):
    def __init__(self):
        self.lock = threading.Lock()
        self.items = []

    def add(self, item):
        with self.lock:
            self.items.append(item)

    def getAll(self):
        with self.lock:
            items, self.items = self.items, []
        return items

If you're also pulling them individually, and making use of the blocking behaviour for empty queues, then you should use Queue, but your use case looks much simpler, and might be better served by the above approach.

[Edit2] I'd missed the fact that you're polling the queue from an idle loop, and from your update, I see that the problem isn't related to contention, so the below approach isn't really relevant to your problem. I've left it in in case anyone finds a blocking variant of this useful:

For cases where you do want to block until you get at least one result, you can modify the above code to wait for data to become available through being signalled by the producer thread. Eg.

class ItemStore(object):
    def __init__(self):
        self.cond = threading.Condition()
        self.items = []

    def add(self, item):
        with self.cond:
            self.items.append(item)
            self.cond.notify() # Wake 1 thread waiting on cond (if any)

    def getAll(self, blocking=False):
        with self.cond:
            # If blocking is true, always return at least 1 item
            while blocking and len(self.items) == 0:
                self.cond.wait()
            items, self.items = self.items, []
        return items

Solution 3

I think the easiest way of getting all items out of the queue is the following:

def get_all_queue_result(queue):

    result_list = []
    while not queue.empty():
        result_list.append(queue.get())

    return result_list

Solution 4

The simplest method is using a list comprehension:

items = [q.get() for _ in range(q.qsize())]

Use of the range function is generally frowned upon, but I haven't found a simpler method yet.

Solution 5

If you're done writing to the queue, qsize should do the trick without needing to check the queue for each iteration.

responseList = []
for items in range(0, q.qsize()):
    responseList.append(q.get_nowait())
Share:
31,886
Eli Bendersky
Author by

Eli Bendersky

Blog Github

Updated on December 17, 2020

Comments

  • Eli Bendersky
    Eli Bendersky over 3 years

    I have one thread that writes results into a Queue.

    In another thread (GUI), I periodically (in the IDLE event) check if there are results in the queue, like this:

    def queue_get_all(q):
        items = []
        while 1:
            try:
                items.append(q.get_nowait())
            except Empty, e:
                break
        return items
    

    Is this a good way to do it ?

    Edit:

    I'm asking because sometimes the waiting thread gets stuck for a few seconds without taking out new results.

    The "stuck" problem turned out to be because I was doing the processing in the idle event handler, without making sure that such events are actually generated by calling wx.WakeUpIdle, as is recommended.

  • Eli Bendersky
    Eli Bendersky over 15 years
    I am not using get() because I don't want to block the GUI. It's OK if the call thinks theres nothing in the Q just when a new item is being put in, because I call it periodically
  • Brian
    Brian over 15 years
    It could cause long waits if he's calling it in a while 1: busyloop. If so, the consumer would not only be reducing the producer's timeslice, but would also spend a lot of time holding the Queue's lock, potentially livelocking the producer. (This is more likely if he has multiple consumer threads.)
  • Jon Cage
    Jon Cage over 15 years
    Good point Brian. I got the impression that eliben wasn't calling this all that frequenctly, but I could be wrong - eliben?
  • Brian
    Brian over 15 years
    Oops, you're right - I missed the part about calling it in an idle event. That wouldn't be enough to cause such problems.
  • fantabolous
    fantabolous over 8 years
    This could still give you an exception at some point. From the docs: If empty() returns True it doesn’t guarantee that a subsequent call to put() will not block. Similarly, if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block. docs.python.org/2/library/queue.html
  • fantabolous
    fantabolous over 8 years
    qsize is similar to empty in that it's not guaranteed. From the docs: Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block, nor will qsize() < maxsize guarantee that put() will not block. docs.python.org/2/library/queue.html
  • fantabolous
    fantabolous about 6 years
    @kaptan the problem is if it does NOT block, and another thread tries to do something with it at the same time, resulting in exceptions or other bad behaviour.
  • fantabolous
    fantabolous about 6 years
    @kaptan specifically in the example above another thread might empty out the queue between this thread's empty() and get() calls, in which case get() could raise an empty queue exception