PicklingError when using multiprocessing

25,414

The problem here is less of the "pickle" error message than conceptual: multiprocess does fork your code in "worker" different processes in order to perform its magic.

It then sends data to and from the different process by seamlessly serializing and de-serializing the data (that is the part that uses the pickle).

When part of the data passed back and forth is a function - it assumes a function with the same name exists in the callee process, and (I guess) passes the function name, as a string. Since functions are stateless, the called worker-process just calls that same function with the data it has received. (Python functions can't be serialized through pickle, so just the reference is passed between the master and the worker processes)

When your function is a method in an instance - although when we code python it is much like the same thing as a function, with an "automatic" self variable, it is not the same underneath. Because instances (objects) are stateful. That means the worker process does not have a copy of the object that is the owner of the method you want to call on the other side.

Working around ways of passing your method as a function to the map_async call won't work either - as multiprocess just uses a function reference, not the actual function when passing it around.

So, you should (1) either change your code so that you do pass a function - and not a method - to the worker processes, converting whatever states the object keeps to new parameters to be called. (2) Create a "target" function for the map_async call that reconstructs the needed object on the worker-process side, and then calls the function inside it. Most straightforward classes in Python are pickable themselves, so you could pass the object that is the function owner itself on the map_async call - and the "target" function would call the appropriate method itself on the worker side.

(2) may sound "difficult" but it is probably just something like this - unless your object's class can't be pickled:

import types

def target(object, *args, **kw):
    method_name = args[0]
    return getattr(object, method_name)(*args[1:])
(...)    
#And add these 3 lines prior to your map_async call:


    # Evaluate function
    if isinstance (func, types.MethodType):
        arguments.insert(0, func.__name__)
        func = target
    result = pool.map_async(func, arguments, chunksize = chunksize)

*disclaimer: I haven't tested this

Share:
25,414

Related videos on Youtube

matiasq
Author by

matiasq

Updated on November 30, 2021

Comments

  • matiasq
    matiasq over 2 years

    I am having trouble when using the Pool.map_async() (and also Pool.map()) in the multiprocessing module. I have implemented a parallel-for-loop function that works fine as long as the function input to Pool.map_async is a "regular" function. When the function is e.g. a method to a class, then I get a PicklingError:

    cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
    

    I use Python only for scientific computing so I am not so familiar with the concept of pickling, have just learned a bit about it today. I have looked at a couple of previous answers, like Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map(), but I cannot figure out how to make it work, even when following the link provided in the answer.

    My code, where the objective is to simulate a vector of Normal r.v's with the use of multiple cores. Note that this is just an example and maybe it does not even payoff to run on multiple cores.

    import multiprocessing as mp
    import scipy as sp
    import scipy.stats as spstat
    
    def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
        """
        Purpose: Evaluate function using Multiple cores.
    
        Input:
            func       - Function to evaluate in parallel
            arg        - Array of arguments to evaluate func(arg)  
            static_arg - The "static" argument (if any), i.e. the variables that are      constant in the evaluation of func.
            nWorkers   - Number of Workers to process computations.
        Output:
            func(i, static_arg) for i in args.
        
        """
        # Prepare arguments for func: Collect arguments with static argument (if any)
        if static_arg != None:
            arguments = [[arg] + static_arg for arg in list(args)]
        else:
            arguments = args
        
        # Initialize workers
        pool = mp.Pool(processes = nWorkers) 
    
        # Evaluate function
        result = pool.map_async(func, arguments, chunksize = chunksize)
        pool.close()
        pool.join()
    
        return sp.array(result.get()).flatten() 
    
    # First test-function. Freeze location and scale for the Normal random variates generator.
    # This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
    # so this will give an error.
    def genNorm(loc, scale):
        def subfunc(a):
            return spstat.norm.rvs(loc = loc, scale = scale, size = a)
        return subfunc
    
    # Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be 
    # pickled
    def test(fargs):
        x, a, b = fargs
        return spstat.norm.rvs(size = x, loc = a, scale = b)
    
    # Try it out.
    N = 1000000
    
    # Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each 
    # element in the output vector.
    args1 = sp.ones(N)
    static_arg = [0, 1] # standarized normal.
    
    # This gives the PicklingError
    func = genNorm(*static_arg)
    sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)
    
    # This is OK:
    func = test
    sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)
    

    Following the link provided in the answer to the question in Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map(), Steven Bethard (almost at the end) suggests using the copy_reg module. His code is:

    def _pickle_method(method):
        func_name = method.im_func.__name__
        obj = method.im_self
        cls = method.im_class
        return _unpickle_method, (func_name, obj, cls)
    
    def _unpickle_method(func_name, obj, cls):
        for cls in cls.mro():
            try:
                func = cls.__dict__[func_name]
            except KeyError:
                pass
            else:
                break
        return func.__get__(obj, cls)
    
    import copy_reg
    import types
    
    copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
    

    I don't really understand how I can make use of this. The only thing I could come up with was putting it just before my code but it did not help. A simple solution is of course to just go with the one that works and avoid getting involved with copy_reg. I am more interested in getting copy_reg to work properly to take fully advantage of multiprocessing without having to go around the problem each time.

  • matiasq
    matiasq over 12 years
    Thank you for your answer. I have a question and would be very thankful if you could answer: 1. You say: "(1) either change your code so that you do pass a function - and not a method - to the worker processes, ...". This is what I am doing in my second try, i.e. with the test() function right? My question is: if I am NOT passing a function, how come that it works? Do you mean that I can run into future bugs? I tried your code and it worked too, but I don't see the point of "complicating" things if my first alternative already worked.
  • matiasq
    matiasq over 12 years
    I would also like to point out that your alternative (2) won't work for me, because my main problem is that the class that I am using is not pickable. I was trying to get around this using copy_reg, which should be possible since Steve Bethard used the second code I posted, and it worked for him. Again, thank you very much for your time.
  • matiasq
    matiasq over 12 years
    Regarding my first post, I was wrong. I did write your code, but it did not have any effect since the "if isinstance(func, types.MethodType):" never was true and thus the code was not executed. I apologize for not noticing this before.