How to use queue with concurrent future ThreadPoolExecutor in python 3?
Solution 1
I would suggest something like this:
def run(queue):
item = queue.get()
self.__log.info(str(item))
return True
<queue filled here>
workerThreadsToStart = 10
with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor:
furtureIteams = { executor.submit(run, queue): index for intex in range(workerThreadsToStart)}
for future in concurrent.futures.as_completed(furtureIteams):
f = furtureIteams[future]
print(f)
The problem you will run in is that a queue is thought to be endless and as a medium to decouple the threads that put something into the queue and threads that get items out of the queue.
When
- you have a finite number of items or
- you compute all items at once
and afterwards process them in parallel, a queue makes no sense. A ThreadPoolExecutor makes a queue obsolete in these cases.
I had a look at the ThreadPoolExecutor source:
def submit(self, fn, *args, **kwargs): # line 94
self._work_queue.put(w) # line 102
A Queue is used inside.
Solution 2
As commented above, you can use the iter()
function to execute a ThreadPool on a queue object. A very general code for this would look something like this:
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(run, iter(queue.get, None))
Where the run method executes the aspired work on the items of the queue.
user2433024
Updated on June 18, 2022Comments
-
user2433024 almost 2 years
I am using simple threading modules to do concurrent jobs. Now I would like to take advantages of concurrent futures modules. Can some put me a example of using a queue with concurrent library?
I am getting TypeError: 'Queue' object is not iterable I dont know how to iterate queues
code snippet:
def run(item): self.__log.info(str(item)) return True <queue filled here> with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor: furtureIteams = { executor.submit(run, item): item for item in list(queue)} for future in concurrent.futures.as_completed(furtureIteams): f = furtureIteams[future] print(f)
-
User almost 11 yearsUsually you would use a Queue for the consumer producer problem en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
-
user2433024 almost 11 yearsI am looking for some sample code to read a queue using threadpoolexecutor
-
-
jfs almost 11 years+1 a queue might be redundant here. In general, you can convert a queue into an iterable using two-argument
iter()
function:for item in iter(queue.get, sentinel): # get items until sentinel found