how to keep track of asynchronous results returned from a multiprocessing pool

23,312

Without seeing actual code, I can only answer in generalities. But there are two general solutions.

First, instead of using a callback and ignoring the AsyncResults, store them in some kind of collection. Then you can just use that collection. For example, if you want to be able to look up the results for a function using that function as a key, just create a dict keyed with the functions:

def in_parallel(funcs):
    results = {}
    pool = mp.Pool()
    for func in funcs:
        results[func] = pool.apply_async(func)
    pool.close()
    pool.join()
    return {func: result.get() for func, result in results.items()}

Alternatively, you can change the callback function to store the results in your collection by key. For example:

def in_parallel(funcs):
    results = {}
    pool = mp.Pool()
    for func in funcs:
        def callback(result, func=func):
            results[func] = result
        pool.apply_async(func, callback=callback)
    pool.close()
    pool.join()
    return results

I'm using the function itself as a key. But you want to use the index instead, that's just as easy. Any value you have, you can use as a key.


Meanwhile, the example you linked is really just calling the same function on a bunch of arguments, waiting for all of them to finish, and leaving the results in some iterable in arbitrary order. That's exactly what imap_unordered does, but a lot more simply. You could replace the whole complicated thing from the linked code with this:

pool = mp.Pool()
results = list(pool.imap_unordered(foo_pool, range(10)))
pool.close()
pool.join()

And then, if you want the results in their original order instead of in arbitrary order, you can just switch to imap or map instead. So:

pool = mp.Pool()
results = pool.map(foo_pool, range(10))
pool.close()
pool.join()

If you need something similar but too complicated to fit into the map paradigm, concurrent.futures will probably make your life easier than multiprocessing. If you're on Python 2.x, you will have to install the backport. But then you can do things that are much harder to do with AsyncResults or callbacks (or map), like composing a whole bunch of futures into one big future. See the examples in the linked docs.


One last note:

The important points to emphasise are that I can not modify the existing functions…

If you can't modify a function, you can always wrap it. For example, let's say I have a function that returns the square of a number, but I'm trying to build a dict mapping numbers to their squares asynchronously, so I need to have the original number as part of the result as well. That's easy:

def number_and_square(x):
    return x, square(x)

And now, I can just apply_async(number_and_square) instead of just square, and get the results I want.

I didn't do that in the examples above because in the first case I stored the key into the collection from the calling side, and in the second place I bound it into the callback function. But binding it into a wrapper around the function is just as easy as either of these, and can be appropriate when neither of these is.

Share:
23,312
d3pd
Author by

d3pd

Updated on July 10, 2022

Comments

  • d3pd
    d3pd almost 2 years

    I am trying to add multiprocessing to some code which features functions that I can not modify. I want to submit these functions as jobs to a multiprocessing pool asynchronously. I am doing something much like the code shown here. However, I am not sure how to keep track of results. How can I know to which applied function a returned result corresponds?

    The important points to emphasise are that I can not modify the existing functions (other things rely on them remaining as they are) and that results can be returned in an order different to the order in which the function jobs are applied to the pool.

    Thanks for any thoughts on this!

    EDIT: Some attempt code is below:

    import multiprocessing
    from multiprocessing import Pool
    import os
    import signal
    import time
    import inspect
    
    def multiply(multiplicand1=0, multiplicand2=0):
        return multiplicand1*multiplicand2
    
    def workFunctionTest(**kwargs):
        time.sleep(3)
        return kwargs
    
    def printHR(object):
        """
        This function prints a specified object in a human readable way.
        """
        # dictionary
        if isinstance(object, dict):
            for key, value in sorted(object.items()):
                print u'{a1}: {a2}'.format(a1=key, a2=value)
        # list or tuple
        elif isinstance(object, list) or isinstance(object, tuple):
            for element in object:
                print element
        # other
        else:
            print object
    
    class Job(object):
        def __init__(
            self,
            workFunction=workFunctionTest,
            workFunctionKeywordArguments={'testString': "hello world"},
            workFunctionTimeout=1,
            naturalLanguageString=None,
            classInstance=None,
            resultGetter=None,
            result=None
            ):
            self.workFunction=workFunction
            self.workFunctionKeywordArguments=workFunctionKeywordArguments
            self.workFunctionTimeout=workFunctionTimeout
            self.naturalLanguageString=naturalLanguageString
            self.classInstance=self.__class__.__name__
            self.resultGetter=resultGetter
            self.result=result
        def description(self):
            descriptionString=""
            for key, value in sorted(vars(self).items()):
                descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value))
            return descriptionString
        def printout(self):
            """
            This method prints a dictionary of all data attributes.
            """
            printHR(vars(self))
    
    class JobGroup(object):
        """
        This class acts as a container for jobs. The data attribute jobs is a list of job objects.
        """
        def __init__(
            self,
            jobs=None,
            naturalLanguageString="null",
            classInstance=None,
            result=None
            ):
            self.jobs=jobs
            self.naturalLanguageString=naturalLanguageString
            self.classInstance=self.__class__.__name__
            self.result=result
        def description(self):
            descriptionString=""
            for key, value in sorted(vars(self).items()):
                descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value))
            return descriptionString
        def printout(self):
            """
            This method prints a dictionary of all data attributes.
            """
            printHR(vars(self))
    
    def initialise_processes():
        signal.signal(signal.SIGINT, signal.SIG_IGN)
    
    def execute(
            jobObject=None,
            numberOfProcesses=multiprocessing.cpu_count()
            ):
            # Determine the current function name.
        functionName=str(inspect.stack()[0][3])
        def collateResults(result):
            """
            This is a process pool callback function which collates a list of results returned.
            """
            # Determine the caller function name.
            functionName=str(inspect.stack()[1][3])
            print("{a1}: result: {a2}".format(a1=functionName, a2=result))
            results.append(result)
        def getResults(job):
            # Determine the current function name.
            functionName=str(inspect.stack()[0][3])
            while True:
                try:
                    result=job.resultGetter.get(job.workFunctionTimeout)
                    break
                except multiprocessing.TimeoutError:
                    print("{a1}: subprocess timeout for job".format(a1=functionName, a2=job.description()))
            #job.result=result
            return result
        # Create a process pool.
        pool1 = multiprocessing.Pool(numberOfProcesses, initialise_processes)
        print("{a1}: pool {a2} of {a3} processes created".format(a1=functionName, a2=str(pool1), a3=str(numberOfProcesses)))
        # Unpack the input job object and submit it to the process pool.
        print("{a1}: unpacking and applying job object {a2} to pool...".format(a1=functionName, a2=jobObject))
        if isinstance(jobObject, Job):
            # If the input job object is a job, apply it to the pool with its associated timeout specification.
            # Return a list of results.
            job=jobObject
            print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description()))
            # Apply the job to the pool, saving the object pool.ApplyResult to the job object.
            job.resultGetter=pool1.apply_async(
                    func=job.workFunction,
                    kwds=job.workFunctionKeywordArguments
            )
            # Get results.
            # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result.
            print("{a1}: getting results for job...".format(a1=functionName))
            job.result=getResults(job)
            print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description()))
            print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result))
            # Return the job result from execute.
            return job.result
            pool1.terminate()
            pool1.join()
        elif isinstance(jobObject, JobGroup):
            # If the input job object is a job group, cycle through each job and apply it to the pool with its associated timeout specification.
            for job in jobObject.jobs:
                print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description()))
                # Apply the job to the pool, saving the object pool.ApplyResult to the job object.
                job.resultGetter=pool1.apply_async(
                        func=job.workFunction,
                        kwds=job.workFunctionKeywordArguments
                )
            # Get results.
            # Cycle through each job and and append the result for the job to a list of results.
            results=[]
            for job in jobObject.jobs:
                # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result.
                print("{a1}: getting results for job...".format(a1=functionName))
                job.result=getResults(job)
                print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description()))
                #print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result))
                # Collate the results.
                results.append(job.result)
            # Apply the list of results to the job group data attribute results.
            jobObject.results=results
            print("{a1}: job group results: {a2}".format(a1=functionName, a2=jobObject.results))
            # Return the job result list from execute.
            return jobObject.results
            pool1.terminate()
            pool1.join()
        else:
            # invalid input object
            print("{a1}: invalid job object {a2}".format(a1=functionName, a2=jobObject))
    
    def main():
        print('-'*80)
        print("MULTIPROCESSING SYSTEM DEMONSTRATION\n")
    
        # Create a job.
        print("# creating a job...\n")
        job1=Job(
                workFunction=workFunctionTest,
                workFunctionKeywordArguments={'testString': "hello world"},
                workFunctionTimeout=4
        )
        print("- printout of new job object:")
        job1.printout()
        print("\n- printout of new job object in logging format:")
        print job1.description()
    
        # Create another job.
        print("\n# creating another job...\n")
        job2=Job(
                workFunction=multiply,
                workFunctionKeywordArguments={'multiplicand1': 2, 'multiplicand2': 3},
                workFunctionTimeout=6
        )
        print("- printout of new job object:")
        job2.printout()
        print("\n- printout of new job object in logging format:")
        print job2.description()
    
        # Create a JobGroup object.
        print("\n# creating a job group (of jobs 1 and 2)...\n")
        jobGroup1=JobGroup(
                jobs=[job1, job2],
        )
        print("- printout of new job group object:")
        jobGroup1.printout()
        print("\n- printout of new job group object in logging format:")
        print jobGroup1.description()
    
        # Submit the job group.
        print("\nready to submit job group")
        response=raw_input("\nPress Enter to continue...\n")
        execute(jobGroup1)
    
        response=raw_input("\nNote the results printed above. Press Enter to continue the demonstration.\n")
    
        # Demonstrate timeout.
        print("\n # creating a new job in order to demonstrate timeout functionality...\n")
        job3=Job(
                workFunction=workFunctionTest,
                workFunctionKeywordArguments={'testString': "hello world"},
                workFunctionTimeout=1
        )
        print("- printout of new job object:")
        job3.printout()
        print("\n- printout of new job object in logging format:")
        print job3.description()
        print("\nNote the timeout specification of only 1 second.")
    
        # Submit the job.
        print("\nready to submit job")
        response=raw_input("\nPress Enter to continue...\n")
        execute(job3)
    
        response=raw_input("\nNote the recognition of timeouts printed above. This concludes the demonstration.")
        print('-'*80)
    
    if __name__ == '__main__':
        main()
    

    EDIT: This question has been placed [on hold] for the following stated reason:

    "Questions asking for code must demonstrate a minimal understanding of the problem being solved. Include attempted solutions, why they didn't work, and the expected results. See also: Stack Overflow question checklist"

    This question is not requesting code; it is requesting thoughts, general guidance. A minimal understanding of the problem under consideration is demonstrated (note the correct use of the terms "multiprocessing", "pool" and "asynchronously" and note the reference to prior code). Regarding attempted solutions, I acknowledge that attempted efforts at solutions would have been beneficial. I have added such code now. I hope that I have addressed the concerns raised that lead to the [on hold] status.

  • d3pd
    d3pd over 10 years
    Thank you very much for your clear suggestions and guidance. As you suggested, I am experimenting with the function wrapping and your trick of using the function as a key in a dictionary of results. concurrent.futures looks promising and I will examine it soon too. Thanks again.
  • chrishiestand
    chrishiestand about 10 years
    Thanks for such a good answer. I have a very similar problem that isn't addressed. What if you need to be able to set a timeout on each task, and want to know which inputs resulted in a timeout?
  • MariangeMarcano
    MariangeMarcano over 2 years
    I am wondering if def in_parallel(funcs): doing the assignment results[func] = result is process/thread safe?