Multithreading with concurrent.futures in Python not working
Concurrency is hard. Luckily, Python's concurrent.futures
module manages to nicely decouple the workers (result producers) from the main program (result consumers) via concept of futures. Also, executors (thread pool or process pool) will handle for you all the dirty details of thread/process spawning, synchronizing and joining.
After you submit a job to your executor, the future you receive will hold a handle to the result. Non-blocking checking of status is done with future.done()
and blocking obtaining of the result with future.result()
.
The exact order in which futures are resolved and results made available is not deterministic (unless you're running some real-time OS), and depends on operating system's thread/process scheduler. Also, since you check for .done()
immediately after you submit a job, it is possible the job will not actually finish by that time. And if your worker function is more complex than in the example, it's highly likely it won't. When I run your example, it finished in ~50% of the time (got true
printed couple of times).
However, to wait for and iterate over results, in order they become available (not necessarily in order they were submitted), you can use concurrent.futures.as_completed
.
For example:
from concurrent.futures import ThreadPoolExecutor, as_completed
def function(param):
return "function result for param: %s" % param
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(function, x) for x in [1,2,3]]
for future in as_completed(futures):
print(future.result())
This outputs:
function result for param: 1
function result for param: 3
function result for param: 2
The alternative is to submit all tasks, and then iterate over all futures, doing a blocking result fetch on all, in order they were submitted:
# submit all
...
# read results in order
for future in futures:
print(future.result())
but this somewhat defeats the purpose of asynchronous futures. Not entirely, assuming that by the time you enqueue all your 8000 tasks, the first ones are going to finish. But that's a lot of unnecessary assumptions. Use as_completed
.
Related videos on Youtube
Comments
-
KnakworstKoning almost 2 years
I'm trying to get multithreading to work in my program. But it never executes my worker function
functionName
.future.done()
is always returningFalse
and the text "function entered" is never printed.def functionName(par1, par2, par3): print("function entered") with ThreadPoolExecutor(max_workers=2) as executor: for x in items: #this loops 8000 times future = executor.submit(functionName, parameter1, parameter2, parameter3) if future.done(): print("true")
-
Sraw about 6 yearsBecause your
future
is never waited. -
KnakworstKoning about 6 yearsDo you know how i can fix this problem?
-
-
KnakworstKoning about 6 yearsI indeed simplified my example. In my real code the function that i want to be executed by my workers, is a function that takes like 20 seconds to complete
-
randomir about 6 yearsYes, in that case it's clear why you're not seeing the output conditional on done status.
-
KnakworstKoning about 6 yearsI also checked the
len(future)
and that is indeed going up to 8000. So i need to add thefor
-loop that checksas_completed
-
KnakworstKoning about 6 yearsIs it required to add the part
for x in [1,2,3]
? -
randomir about 6 yearsIn your example that would be “for x in items”. Or instead of list comprehension you can use a standard loop, and append futures to a list.
-
KnakworstKoning about 6 yearsif i put "for x in items" after my submin should i remove the for-loop then?
-
randomir about 6 yearsYes. We’re using so called list comprehension instead of loop in that case. Try: [x*2 for x in range(5)].