TypeError: can't pickle _thread._local objects when using dask on pandas DataFrame
Your run
function probably references variables fro outside its scope, which are being captured into the closure. Be sure that any file handles or database connections get created inside the function
Bad:
conn = DBConn(...)
def run(row):
return conn.do_stuff(row)
Good:
def run(row):
conn = DBConn(...)
return conn.do_stuff(row)
Related videos on Youtube
FrancescoLS
I am a data scientist, with a background in high energy particle physics and many years spent at CERN.
Updated on September 15, 2022Comments
-
FrancescoLS almost 2 years
I have a huge DataFrame which I want to process using dask in order to save time. The problem is that I get stuck in this
TypeError: can't pickle _thread._local objects
error as soon as it starts running. Can someone help me?I have written a function that processes the data stored in the DF based on its rows and I process it with
out = df_query.progress_apply(lambda row: run(row), axis=1)
and it runs fine.
Since this takes a lot of time, I started using dask:
ddata = dd.from_pandas(df_query, npartitions=3) out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')
The problem is that as soon as the processing starts I get this error (after a huge Traceback, see below):
TypeError: can't pickle _thread._local objects
The
run(...)
function does some data manipulation, including queries to a DB.Here is the complete Traceback:
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-14-aefae1f00437> in <module> ----> 1 out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes') ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs) 396 keys = [x.__dask_keys__() for x in collections] 397 postcomputes = [x.__dask_postcompute__() for x in collections] --> 398 results = schedule(dsk, keys, **kwargs) 399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 400 ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, pool, **kwargs) 190 get_id=_process_get_id, dumps=dumps, loads=loads, 191 pack_exception=pack_exception, --> 192 raise_exception=reraise, **kwargs) 193 finally: 194 if cleanup: ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs) 447 # Seed initial tasks into the thread pool 448 while state['ready'] and len(state['running']) < num_workers: --> 449 fire_task() 450 451 # Main loop, wait on tasks to finish, insert new ones ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in fire_task() 441 # Submit 442 apply_async(execute_task, --> 443 args=(key, dumps((dsk[key], data)), 444 dumps, loads, get_id, pack_exception), 445 callback=queue.put) ~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in _dumps(x) 24 25 def _dumps(x): ---> 26 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) 27 28 ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol) 950 try: 951 cp = CloudPickler(file, protocol=protocol) --> 952 cp.dump(obj) 953 return file.getvalue() 954 finally: ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 265 self.inject_addons() 266 try: --> 267 return Pickler.dump(self, obj) 268 except RuntimeError as e: 269 if 'recursion' in e.args[0]: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in dump(self, obj) 435 if self.proto >= 4: 436 self.framer.start_framing() --> 437 self.save(obj) 438 self.write(STOP) 439 self.framer.end_framing() ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj) 769 if n <= 3 and self.proto >= 2: 770 for element in obj: --> 771 save(element) 772 # Subtle. Same as in the big comment below. 773 if id(obj) in memo: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj) 769 if n <= 3 and self.proto >= 2: 770 for element in obj: --> 771 save(element) 772 # Subtle. Same as in the big comment below. 773 if id(obj) in memo: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 547 548 # Save the reduce() output and finally memoize the object --> 549 self.save_reduce(obj=obj, *rv) 550 551 def persistent_id(self, obj): ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) 636 else: 637 save(func) --> 638 save(args) 639 write(REDUCE) 640 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj) 784 write(MARK) 785 for element in obj: --> 786 save(element) 787 788 if id(obj) in memo: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items) 885 k, v = tmp[0] 886 save(k) --> 887 save(v) 888 write(SETITEM) 889 # else tmp is empty, and we're done ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj) 784 write(MARK) 785 for element in obj: --> 786 save(element) 787 788 if id(obj) in memo: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj) 769 if n <= 3 and self.proto >= 2: 770 for element in obj: --> 771 save(element) 772 # Subtle. Same as in the big comment below. 773 if id(obj) in memo: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj) 814 815 self.memoize(obj) --> 816 self._batch_appends(obj) 817 818 dispatch[list] = save_list ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items) 838 write(MARK) 839 for x in tmp: --> 840 save(x) 841 write(APPENDS) 842 elif n: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj) 814 815 self.memoize(obj) --> 816 self._batch_appends(obj) 817 818 dispatch[list] = save_list ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items) 838 write(MARK) 839 for x in tmp: --> 840 save(x) 841 write(APPENDS) 842 elif n: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name) 393 or getattr(obj.__code__, 'co_filename', None) == '<stdin>' 394 or themodule is None): --> 395 self.save_function_tuple(obj) 396 return 397 else: ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func) 592 if hasattr(func, '__qualname__'): 593 state['qualname'] = func.__qualname__ --> 594 save(state) 595 write(pickle.TUPLE) 596 write(pickle.REDUCE) # applies _fill_function on the tuple ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items) 880 for k, v in tmp: 881 save(k) --> 882 save(v) 883 write(SETITEMS) 884 elif n: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items) 885 k, v = tmp[0] 886 save(k) --> 887 save(v) 888 write(SETITEM) 889 # else tmp is empty, and we're done ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name) 393 or getattr(obj.__code__, 'co_filename', None) == '<stdin>' 394 or themodule is None): --> 395 self.save_function_tuple(obj) 396 return 397 else: ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func) 592 if hasattr(func, '__qualname__'): 593 state['qualname'] = func.__qualname__ --> 594 save(state) 595 write(pickle.TUPLE) 596 write(pickle.REDUCE) # applies _fill_function on the tuple ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items) 880 for k, v in tmp: 881 save(k) --> 882 save(v) 883 write(SETITEMS) 884 elif n: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items) 880 for k, v in tmp: 881 save(k) --> 882 save(v) 883 write(SETITEMS) 884 elif n: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name) 393 or getattr(obj.__code__, 'co_filename', None) == '<stdin>' 394 or themodule is None): --> 395 self.save_function_tuple(obj) 396 return 397 else: ~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func) 592 if hasattr(func, '__qualname__'): 593 state['qualname'] = func.__qualname__ --> 594 save(state) 595 write(pickle.TUPLE) 596 write(pickle.REDUCE) # applies _fill_function on the tuple ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items) 880 for k, v in tmp: 881 save(k) --> 882 save(v) 883 write(SETITEMS) 884 elif n: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items) 880 for k, v in tmp: 881 save(k) --> 882 save(v) 883 write(SETITEMS) 884 elif n: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 547 548 # Save the reduce() output and finally memoize the object --> 549 self.save_reduce(obj=obj, *rv) 550 551 def persistent_id(self, obj): ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) 660 661 if state is not None: --> 662 save(state) 663 write(BUILD) 664 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items) 880 for k, v in tmp: 881 save(k) --> 882 save(v) 883 write(SETITEMS) 884 elif n: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 547 548 # Save the reduce() output and finally memoize the object --> 549 self.save_reduce(obj=obj, *rv) 550 551 def persistent_id(self, obj): ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) 660 661 if state is not None: --> 662 save(state) 663 write(BUILD) 664 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items) 880 for k, v in tmp: 881 save(k) --> 882 save(v) 883 write(SETITEMS) 884 elif n: ~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 522 reduce = getattr(obj, "__reduce_ex__", None) 523 if reduce is not None: --> 524 rv = reduce(self.proto) 525 else: 526 reduce = getattr(obj, "__reduce__", None) TypeError: can't pickle _thread._local objects
-
FrancescoLS about 5 yearsIt works, thanks a lot! I was indeed accessing the DB using an engine that was declared outside of the
run(row)
function. Out of curiosity: why does it work fordf.progress_apply(...)
then? -
mdurant about 5 yearsPandas runs sequentially in a single thread, so you can pass the object around. Dask farms out tasks to workers, and so needs to be able to serialise those tasks, which includes the function you want to run and any variables it depends on.
-
AmyChodorowski over 3 yearsIs there a way to give each worker an individual database connection? Setting up the database connection each time is time expensive.
-
mdurant over 3 yearsYes, you could store the DB connection object on the worker instance (
get_worker()
) or a module global variable - but you'll have to do some work to make sure multiple threads don't race to create it (if the connection is thread-safe) and that you shut it down when finished. -
kn0t over 3 years@mdurant Thanks!! I searched for hours until I found this.
-
BetterCallMe over 2 years@AmyChodorowski I am facing same problem what you mentioned. Creating a database connection is computationally inefficient. Have you found any answer to this? Please share.