TypeError: can't pickle _thread._local objects when using dask on pandas DataFrame

10,522

Your run function probably references variables fro outside its scope, which are being captured into the closure. Be sure that any file handles or database connections get created inside the function

Bad:

conn = DBConn(...)
def run(row):
    return conn.do_stuff(row)

Good:

def run(row):
    conn = DBConn(...)
    return conn.do_stuff(row)
Share:
10,522

Related videos on Youtube

FrancescoLS
Author by

FrancescoLS

I am a data scientist, with a background in high energy particle physics and many years spent at CERN.

Updated on September 15, 2022

Comments

  • FrancescoLS
    FrancescoLS almost 2 years

    I have a huge DataFrame which I want to process using dask in order to save time. The problem is that I get stuck in this TypeError: can't pickle _thread._local objects error as soon as it starts running. Can someone help me?

    I have written a function that processes the data stored in the DF based on its rows and I process it with

    out = df_query.progress_apply(lambda row: run(row), axis=1)
    

    and it runs fine.

    Since this takes a lot of time, I started using dask:

    ddata = dd.from_pandas(df_query, npartitions=3)
    out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')
    

    The problem is that as soon as the processing starts I get this error (after a huge Traceback, see below): TypeError: can't pickle _thread._local objects

    The run(...) function does some data manipulation, including queries to a DB.

    Here is the complete Traceback:

    ---------------------------------------------------------------------------
    TypeError                                 Traceback (most recent call last)
    <ipython-input-14-aefae1f00437> in <module>
    ----> 1 out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
        154         dask.base.compute
        155         """
    --> 156         (result,) = compute(self, traverse=False, **kwargs)
        157         return result
        158 
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
        396     keys = [x.__dask_keys__() for x in collections]
        397     postcomputes = [x.__dask_postcompute__() for x in collections]
    --> 398     results = schedule(dsk, keys, **kwargs)
        399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
        400 
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, pool, **kwargs)
        190                            get_id=_process_get_id, dumps=dumps, loads=loads,
        191                            pack_exception=pack_exception,
    --> 192                            raise_exception=reraise, **kwargs)
        193     finally:
        194         if cleanup:
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
        447             # Seed initial tasks into the thread pool
        448             while state['ready'] and len(state['running']) < num_workers:
    --> 449                 fire_task()
        450 
        451             # Main loop, wait on tasks to finish, insert new ones
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in fire_task()
        441                 # Submit
        442                 apply_async(execute_task,
    --> 443                             args=(key, dumps((dsk[key], data)),
        444                                   dumps, loads, get_id, pack_exception),
        445                             callback=queue.put)
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in _dumps(x)
         24 
         25 def _dumps(x):
    ---> 26     return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
         27 
         28 
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
        950     try:
        951         cp = CloudPickler(file, protocol=protocol)
    --> 952         cp.dump(obj)
        953         return file.getvalue()
        954     finally:
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
        265         self.inject_addons()
        266         try:
    --> 267             return Pickler.dump(self, obj)
        268         except RuntimeError as e:
        269             if 'recursion' in e.args[0]:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in dump(self, obj)
        435         if self.proto >= 4:
        436             self.framer.start_framing()
    --> 437         self.save(obj)
        438         self.write(STOP)
        439         self.framer.end_framing()
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
        769         if n <= 3 and self.proto >= 2:
        770             for element in obj:
    --> 771                 save(element)
        772             # Subtle.  Same as in the big comment below.
        773             if id(obj) in memo:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
        769         if n <= 3 and self.proto >= 2:
        770             for element in obj:
    --> 771                 save(element)
        772             # Subtle.  Same as in the big comment below.
        773             if id(obj) in memo:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        547 
        548         # Save the reduce() output and finally memoize the object
    --> 549         self.save_reduce(obj=obj, *rv)
        550 
        551     def persistent_id(self, obj):
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
        636         else:
        637             save(func)
    --> 638             save(args)
        639             write(REDUCE)
        640 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
        784         write(MARK)
        785         for element in obj:
    --> 786             save(element)
        787 
        788         if id(obj) in memo:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
        854 
        855         self.memoize(obj)
    --> 856         self._batch_setitems(obj.items())
        857 
        858     dispatch[dict] = save_dict
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
        885                 k, v = tmp[0]
        886                 save(k)
    --> 887                 save(v)
        888                 write(SETITEM)
        889             # else tmp is empty, and we're done
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
        784         write(MARK)
        785         for element in obj:
    --> 786             save(element)
        787 
        788         if id(obj) in memo:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
        769         if n <= 3 and self.proto >= 2:
        770             for element in obj:
    --> 771                 save(element)
        772             # Subtle.  Same as in the big comment below.
        773             if id(obj) in memo:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj)
        814 
        815         self.memoize(obj)
    --> 816         self._batch_appends(obj)
        817 
        818     dispatch[list] = save_list
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items)
        838                 write(MARK)
        839                 for x in tmp:
    --> 840                     save(x)
        841                 write(APPENDS)
        842             elif n:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj)
        814 
        815         self.memoize(obj)
    --> 816         self._batch_appends(obj)
        817 
        818     dispatch[list] = save_list
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items)
        838                 write(MARK)
        839                 for x in tmp:
    --> 840                     save(x)
        841                 write(APPENDS)
        842             elif n:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
        393                 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
        394                 or themodule is None):
    --> 395             self.save_function_tuple(obj)
        396             return
        397         else:
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
        592         if hasattr(func, '__qualname__'):
        593             state['qualname'] = func.__qualname__
    --> 594         save(state)
        595         write(pickle.TUPLE)
        596         write(pickle.REDUCE)  # applies _fill_function on the tuple
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
        854 
        855         self.memoize(obj)
    --> 856         self._batch_setitems(obj.items())
        857 
        858     dispatch[dict] = save_dict
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
        880                 for k, v in tmp:
        881                     save(k)
    --> 882                     save(v)
        883                 write(SETITEMS)
        884             elif n:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
        854 
        855         self.memoize(obj)
    --> 856         self._batch_setitems(obj.items())
        857 
        858     dispatch[dict] = save_dict
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
        885                 k, v = tmp[0]
        886                 save(k)
    --> 887                 save(v)
        888                 write(SETITEM)
        889             # else tmp is empty, and we're done
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
        393                 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
        394                 or themodule is None):
    --> 395             self.save_function_tuple(obj)
        396             return
        397         else:
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
        592         if hasattr(func, '__qualname__'):
        593             state['qualname'] = func.__qualname__
    --> 594         save(state)
        595         write(pickle.TUPLE)
        596         write(pickle.REDUCE)  # applies _fill_function on the tuple
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
        854 
        855         self.memoize(obj)
    --> 856         self._batch_setitems(obj.items())
        857 
        858     dispatch[dict] = save_dict
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
        880                 for k, v in tmp:
        881                     save(k)
    --> 882                     save(v)
        883                 write(SETITEMS)
        884             elif n:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
        854 
        855         self.memoize(obj)
    --> 856         self._batch_setitems(obj.items())
        857 
        858     dispatch[dict] = save_dict
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
        880                 for k, v in tmp:
        881                     save(k)
    --> 882                     save(v)
        883                 write(SETITEMS)
        884             elif n:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
        393                 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
        394                 or themodule is None):
    --> 395             self.save_function_tuple(obj)
        396             return
        397         else:
    
    ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
        592         if hasattr(func, '__qualname__'):
        593             state['qualname'] = func.__qualname__
    --> 594         save(state)
        595         write(pickle.TUPLE)
        596         write(pickle.REDUCE)  # applies _fill_function on the tuple
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
        854 
        855         self.memoize(obj)
    --> 856         self._batch_setitems(obj.items())
        857 
        858     dispatch[dict] = save_dict
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
        880                 for k, v in tmp:
        881                     save(k)
    --> 882                     save(v)
        883                 write(SETITEMS)
        884             elif n:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
        854 
        855         self.memoize(obj)
    --> 856         self._batch_setitems(obj.items())
        857 
        858     dispatch[dict] = save_dict
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
        880                 for k, v in tmp:
        881                     save(k)
    --> 882                     save(v)
        883                 write(SETITEMS)
        884             elif n:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        547 
        548         # Save the reduce() output and finally memoize the object
    --> 549         self.save_reduce(obj=obj, *rv)
        550 
        551     def persistent_id(self, obj):
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
        660 
        661         if state is not None:
    --> 662             save(state)
        663             write(BUILD)
        664 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
        854 
        855         self.memoize(obj)
    --> 856         self._batch_setitems(obj.items())
        857 
        858     dispatch[dict] = save_dict
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
        880                 for k, v in tmp:
        881                     save(k)
    --> 882                     save(v)
        883                 write(SETITEMS)
        884             elif n:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        547 
        548         # Save the reduce() output and finally memoize the object
    --> 549         self.save_reduce(obj=obj, *rv)
        550 
        551     def persistent_id(self, obj):
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
        660 
        661         if state is not None:
    --> 662             save(state)
        663             write(BUILD)
        664 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        502         f = self.dispatch.get(t)
        503         if f is not None:
    --> 504             f(self, obj) # Call unbound method with explicit self
        505             return
        506 
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
        854 
        855         self.memoize(obj)
    --> 856         self._batch_setitems(obj.items())
        857 
        858     dispatch[dict] = save_dict
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
        880                 for k, v in tmp:
        881                     save(k)
    --> 882                     save(v)
        883                 write(SETITEMS)
        884             elif n:
    
    ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
        522             reduce = getattr(obj, "__reduce_ex__", None)
        523             if reduce is not None:
    --> 524                 rv = reduce(self.proto)
        525             else:
        526                 reduce = getattr(obj, "__reduce__", None)
    
    TypeError: can't pickle _thread._local objects
    
  • FrancescoLS
    FrancescoLS about 5 years
    It works, thanks a lot! I was indeed accessing the DB using an engine that was declared outside of the run(row) function. Out of curiosity: why does it work for df.progress_apply(...) then?
  • mdurant
    mdurant about 5 years
    Pandas runs sequentially in a single thread, so you can pass the object around. Dask farms out tasks to workers, and so needs to be able to serialise those tasks, which includes the function you want to run and any variables it depends on.
  • AmyChodorowski
    AmyChodorowski over 3 years
    Is there a way to give each worker an individual database connection? Setting up the database connection each time is time expensive.
  • mdurant
    mdurant over 3 years
    Yes, you could store the DB connection object on the worker instance (get_worker()) or a module global variable - but you'll have to do some work to make sure multiple threads don't race to create it (if the connection is thread-safe) and that you shut it down when finished.
  • kn0t
    kn0t over 3 years
    @mdurant Thanks!! I searched for hours until I found this.
  • BetterCallMe
    BetterCallMe over 2 years
    @AmyChodorowski I am facing same problem what you mentioned. Creating a database connection is computationally inefficient. Have you found any answer to this? Please share.