How to make future calls and wait until complete with Python?

13,750

Solution 1

(This answer currently ignores HTML parsing your code does ... you can queue that into a pool identically to how this approach queues the net user calls)

First, lets define a function that takes a tuple of (user, group) and returns the desired information.

# a function that calls net user to find info on a (user, group)
def get_group_info(usr_grp):
    # unpack the arguments
    usr, grp = usr_grp

    try:
        return (usr, grp, 
                check_output(
                    "net user /domain " + usr + "| find \"" + grp + "\"", 
                    shell=True, 
                    stderr=subprocess.STDOUT
                    ).strip().decode("utf-8")))
    except Exception:
        return (usr, grp, None)

Now, we can run this in a thread pool using multiprocessing.dummy.Pool

from multiprocessing.dummy import Pool
import itertools

# create a pool with four worker threads
pool = Pool(4)

# run get_group_info for every user, group
async_result = pool.map_async(get_group_info, itertools.product(user_list, groups))

# now do some other work we care about
...

# and then wait on our results
results = async_result.get()

The results are a list of (user, group, data) tuples and can be processed as you desire.

Note: This code is currently untested due to a difference in platforms

Solution 2

In python 3, a more simple and convenient solution is to use concurrent.futures.

The concurrent.futures module provides a high-level interface for asynchronously executing callables. Reference...

import concurrent.futures


# Get a list containing all groups of a user
def get_groups(username):
    # Do the request and check here
    # And return the groups of current user with a list
    return list()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Mark each future with its groups
    future_to_groups = {executor.submit(get_groups, user): user
                        for user in user_list}

    # Now it comes to the result of each user
    for future in concurrent.futures.as_completed(future_to_groups):
        user = future_to_groups[future]
        try:
            # Receive the returned result of current user
            groups = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (user, exc))
        else:
            # Here you do anything you need on `groups`
            # Output or collect them
            print('%r is in %d groups' % (user, len(groups)))

Notice that max_workers here means the max number of threads.

See here where this example comes from.

EDIT:

If you need to do each check in seperate thread:

import concurrent.futures


# Check if a `user` is in a `group`
def check(user, group):
    # Do the check here
    # And return True if user is in this group, False if not
    return True

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Mark each future with its user and group
    future_to_checks = {executor.submit(check, user, group): (user, group)
                        for user in user_list for group in group_list}

    # Now it comes to the result of each check
    # The try-except-else clause is omitted here
    for future in concurrent.futures.as_completed(future_to_checks):
        user, group = future_to_checks[future]
        in_group = future.result()
        if in_group is True:
            print('%r is in %r' % (user, group))

Inspired by @donkopotamus, itertools.product could be used here to generate all targets.

And if you don't need to handle the exceptions, it would be much more simple:

import concurrent.futures
from itertools import product
from collections import defaultdict


def check(target):
    user, group = target
    return True

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = defaultdict(list)
    targets = list(product(user_list, group_list))
    for (user, group), in_group in zip(targets, executor.map(check, targets)):
        if in_group is True:
            results[user].append(group)

    print(results)

Solution 3

The main thread should generate the tasks

class Task:
    def Task(self,user,group)
        self.user  = user
        self.group = group
    def run(self):
        pass # call command with self.user and self.group and process results
        
twp = TaskWorkerPool(4)
for group in groups:
    twp.add( Task(user,group) )
twp.wait()
Share:
13,750
orange
Author by

orange

Updated on June 11, 2022

Comments

  • orange
    orange almost 2 years

    I have the following code where I have a list of usernames and I try and check if the users are in a specific Windows Usergroup using net user \domain | find somegroup.

    The problem is that I run that command for about 8 usergroups per username and it is slow. I would like to send off these calls using futures and even separate threads (if it makes it quicker).

    I just have to wait at the end before i do anything else. How do I go about doing it in Python?

    for one_username in user_list:
        response = requests.get(somecontent)
    
        bs_parsed = BeautifulSoup(response.content, 'html.parser')
    
        find_all2 = bs_parsed.find("div", {"class": "QuickLinks"})
        name = re.sub("\s\s+", ' ', find_all2.find("td", text="Name").find_next_sibling("td").text)
    
        find_all = bs_parsed.find_all("div", {"class": "visible"})
        all_perms = ""
        d.setdefault(one_username + " (" + name + ")", [])
        for value in find_all:
            test = value.find("a", {"onmouseover": True})
            if test is not None:
                if "MyAppID" in test.text:
                    d[one_username + " (" + name + ")"].append(test.text)
    
        for group in groups:
            try:
                d[one_username + " (" + name + ")"].append(check_output("net user /domain " + one_username + "| find \"" + group + "\"", shell=True, stderr=subprocess.STDOUT).strip().decode("utf-8"))
            except Exception:
                pass