Multithreading with concurrent.futures in Python not working

12,956

Concurrency is hard. Luckily, Python's concurrent.futures module manages to nicely decouple the workers (result producers) from the main program (result consumers) via concept of futures. Also, executors (thread pool or process pool) will handle for you all the dirty details of thread/process spawning, synchronizing and joining.

After you submit a job to your executor, the future you receive will hold a handle to the result. Non-blocking checking of status is done with future.done() and blocking obtaining of the result with future.result().

The exact order in which futures are resolved and results made available is not deterministic (unless you're running some real-time OS), and depends on operating system's thread/process scheduler. Also, since you check for .done() immediately after you submit a job, it is possible the job will not actually finish by that time. And if your worker function is more complex than in the example, it's highly likely it won't. When I run your example, it finished in ~50% of the time (got true printed couple of times).

However, to wait for and iterate over results, in order they become available (not necessarily in order they were submitted), you can use concurrent.futures.as_completed.

For example:

from concurrent.futures import ThreadPoolExecutor, as_completed

def function(param):
    return "function result for param: %s" % param

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(function, x) for x in [1,2,3]]

for future in as_completed(futures):
    print(future.result())

This outputs:

function result for param: 1
function result for param: 3
function result for param: 2

The alternative is to submit all tasks, and then iterate over all futures, doing a blocking result fetch on all, in order they were submitted:

# submit all
...
# read results in order
for future in futures:
    print(future.result())

but this somewhat defeats the purpose of asynchronous futures. Not entirely, assuming that by the time you enqueue all your 8000 tasks, the first ones are going to finish. But that's a lot of unnecessary assumptions. Use as_completed.

Share:
12,956

Related videos on Youtube

KnakworstKoning
Author by

KnakworstKoning

Welcome to my page

Updated on June 04, 2022

Comments

  • KnakworstKoning
    KnakworstKoning almost 2 years

    I'm trying to get multithreading to work in my program. But it never executes my worker function functionName. future.done() is always returning False and the text "function entered" is never printed.

    def functionName(par1, par2, par3):
        print("function entered")
    
    with ThreadPoolExecutor(max_workers=2) as executor: 
        for x in items:      #this loops 8000 times
            future = executor.submit(functionName, parameter1, parameter2, parameter3)
            if future.done():
                print("true")
    
    • Sraw
      Sraw about 6 years
      Because your future is never waited.
    • KnakworstKoning
      KnakworstKoning about 6 years
      Do you know how i can fix this problem?
  • KnakworstKoning
    KnakworstKoning about 6 years
    I indeed simplified my example. In my real code the function that i want to be executed by my workers, is a function that takes like 20 seconds to complete
  • randomir
    randomir about 6 years
    Yes, in that case it's clear why you're not seeing the output conditional on done status.
  • KnakworstKoning
    KnakworstKoning about 6 years
    I also checked the len(future) and that is indeed going up to 8000. So i need to add the for-loop that checks as_completed
  • KnakworstKoning
    KnakworstKoning about 6 years
    Is it required to add the part for x in [1,2,3]?
  • randomir
    randomir about 6 years
    In your example that would be “for x in items”. Or instead of list comprehension you can use a standard loop, and append futures to a list.
  • KnakworstKoning
    KnakworstKoning about 6 years
    if i put "for x in items" after my submin should i remove the for-loop then?
  • randomir
    randomir about 6 years
    Yes. We’re using so called list comprehension instead of loop in that case. Try: [x*2 for x in range(5)].