Python thread pool that handles exceptions

22,524

Solution 1

I personally use concurrent.futures as the interface is very simple. For the traceback issue I found a workaround to preserve it. Checkout my answer to this other question:

Getting original line number for exception in concurrent.futures

Solution 2

If you want to get inforamtion about unhandled exception in threads and you use ThreadPoolExecutor, you can do like this:

import time
import traceback

from concurrent.futures import ThreadPoolExecutor


def worker():
    a = 2 / 0


def worker_callbacks(f):
    e = f.exception()

    if e is None:
        return

    trace = []
    tb = e.__traceback__
    while tb is not None:
        trace.append({
            "filename": tb.tb_frame.f_code.co_filename,
            "name": tb.tb_frame.f_code.co_name,
            "lineno": tb.tb_lineno
        })
        tb = tb.tb_next
    print(str({
        'type': type(e).__name__,
        'message': str(e),
        'trace': trace
    }))


executor = ThreadPoolExecutor(max_workers=1)
executor.submit(worker).add_done_callback(worker_callbacks)

Solution 3

Easy solution: use whatever alternative suits you best, and implement your own try-except block in your workers. Surround the root call if you must.

I wouldn't say these libraries handle exceptions "incorrectly". They have a default behavior, however primitive. You are expected to handle this yourself if defaults don't suit you.

Share:
22,524

Related videos on Youtube

xApple
Author by

xApple

Updated on December 25, 2021

Comments

  • xApple
    xApple over 2 years

    I've been looking around a good implementation of a simple python thread pool pattern and really can't find anything that suits my needs. I'm using python 2.7 and all the modules I have found either don't work, or don't handle exceptions in the workers properly. I was wondering if someone knew of a library that could offer the type of functionality I'm searching for. Help greatly appreciated.

    Multiprocessing

    My first attempt was with the built-in multiprocessing module, but as this doesn't use threads but subprocesses instead we run into the problem that objects cannot be pickled. No go here.

    from multiprocessing import Pool
    
    class Sample(object):
        def compute_fib(self, n):
            phi = (1 + 5**0.5) / 2
            self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))
    
    samples = [Sample() for i in range(8)]
    pool = Pool(processes=8)
    for s in samples: pool.apply_async(s.compute_fib, [20])
    pool.join()
    for s in samples: print s.fib
    
    # PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
    

    Futures

    So I see there is a back port of some of the cool concurrent features of python 3.2 here. This seems perfect and simple to use. The problem is that when you get an exception in one of the workers, you only get the type of the exception such as "ZeroDivisionError" but no traceback and thus no indication of which line caused the exception. Code becomes impossible to debug. No go.

    from concurrent import futures
    
    class Sample(object):
        def compute_fib(self, n):
            phi = (1 + 5**0.5) / 2
            1/0
            self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))
    
    samples = [Sample() for i in range(8)]
    pool = futures.ThreadPoolExecutor(max_workers=8)
    threads = [pool.submit(s.compute_fib, 20) for s in samples]
    futures.wait(threads, return_when=futures.FIRST_EXCEPTION)
    for t in threads: t.result()
    for s in samples: print s.fib
    
    
    #    futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self)
    #    354     def __get_result(self):
    #    355         if self._exception:
    #--> 356             raise self._exception
    #    357         else:
    #    358             return self._result
    #
    # ZeroDivisionError: integer division or modulo by zero
    

    Workerpool

    I found an other implementation of this pattern here. This time when an exception occurs it is printed, but then my ipython interactive interpreter is left in a hanging state and needs to be killed from an other shell. No go.

    import workerpool
    
    class Sample(object):
        def compute_fib(self, n):
            phi = (1 + 5**0.5) / 2
            1/0
            self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))
    
    samples = [Sample() for i in range(8)]
    pool = workerpool.WorkerPool(size=8)
    for s in samples: pool.map(s.compute_fib, [20])
    pool.wait()
    for s in samples: print s.fib
    
    # ZeroDivisionError: integer division or modulo by zero
    # ^C^C^C^C^C^C^C^C^D^D
    # $ kill 1783
    

    Threadpool

    Yet an other implementation here. This time when an exception occurs, it is printed to the stderr but the script is not interrupted and instead continues executing, which defies the purpose of the exception and can make things unsafe. Still not usable.

    import threadpool
    
    class Sample(object):
        def compute_fib(self, n):
            phi = (1 + 5**0.5) / 2
            1/0
            self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))
    
    samples = [Sample() for i in range(8)]
    pool = threadpool.ThreadPool(8)
    requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples]
    requests = [y for x in requests for y in x]
    for r in requests: pool.putRequest(r)
    pool.wait()
    for s in samples: print s.fib
    
    # ZeroDivisionError: integer division or modulo by zero
    # ZeroDivisionError: integer division or modulo by zero
    # ZeroDivisionError: integer division or modulo by zero
    # ZeroDivisionError: integer division or modulo by zero
    # ZeroDivisionError: integer division or modulo by zero
    # ZeroDivisionError: integer division or modulo by zero
    # ZeroDivisionError: integer division or modulo by zero
    # ZeroDivisionError: integer division or modulo by zero
    #---> 17 for s in samples: print s.fib
    #
    #AttributeError: 'Sample' object has no attribute 'fib'
    

    - Update -

    It appears that concerning the futures library, the behavior of python 3 is not the same as python 2.

    futures_exceptions.py:

    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    def div_zero(x):
        return x / 0
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = executor.map(div_zero, range(4))
        for future in as_completed(futures): print(future)
    

    Python 2.7.6 output:

    Traceback (most recent call last):
      File "...futures_exceptions.py", line 12, in <module>
        for future in as_completed(futures):
      File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed
        with _AcquireFutures(fs):
      File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__
        self.futures = sorted(futures, key=id)
      File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map
        yield future.result()
      File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
        return self.__get_result()
      File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
        raise self._exception
    ZeroDivisionError: integer division or modulo by zero
    

    Python 3.3.2 output:

    Traceback (most recent call last):
      File "...futures_exceptions.py", line 11, in <module>
        for future in as_completed(futures):
      File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed
        with _AcquireFutures(fs):
      File "...python3.3/concurrent/futures/_base.py", line 142, in __init__
        self.futures = sorted(futures, key=id)
      File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator
        yield future.result()
      File "...python3.3/concurrent/futures/_base.py", line 392, in result
        return self.__get_result()
      File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result
        raise self._exception
      File "...python3.3/concurrent/futures/thread.py", line 54, in run
        result = self.fn(*self.args, **self.kwargs)
      File "...futures_exceptions.py", line 7, in div_zero
        return x / 0
    ZeroDivisionError: division by zero
    
    • habanoz
      habanoz over 10 years
      It doesn't completely solve the problem but one trick I have often used in debugging these problems is temporarily replace the call to pool.map with a call to the builtin map.
  • xApple
    xApple about 11 years
    Adding a try-execpt block cannot solve any of the problems. In the case of concurrent, I still can't get to the original traceback after catching the new exception. In the case of workerpool, I never get to the except block as the interpreter crashes before. In the case of threadpool I never get to the except block as no exceptions are raised at all.
  • slezica
    slezica about 11 years
    You're thinking of a try block in the main thread or process. I'm saying you use a try block around the function worker processes run. If you expect to raise an exception in a worker thread/process and have it sent to your main script, you need to first catch it where it occurred.
  • xApple
    xApple about 11 years
    Well I'm not going to write error handling for every one of the functions I want to run. So what you are saying is that I should write my own global error handling. Yes, I could just chose one of the libraries and start editing the source code to add functionality, but that's what I wanted to avoid : )
  • slezica
    slezica about 11 years
    It's Python: write a high-level error handler and decorate your functions with it -- or even better, subclass the worker class and implement your try-except there. This is not editing source code, much less adding functionality. It's programming. You won't get a library that does this for you, because you are expected (and should be glad to) do it yourself. No offense intended, at all.
  • xApple
    xApple about 11 years
    No offense taken. I will probably write something myself. It's just that it appeared to me this something pretty basic, and am surprised it has not been done my someone else in the past. It didn't appear to me as a particular design that would require custom making. This is concurrency 101...
  • slezica
    slezica about 11 years
    I agree: it is pretty basic. However, it's also very difficult to come up with a default exception handling behavior that works all-around for everyone. Most people would end up overriding it anyway, so implementations leave it for users.
  • Lorenzo Sciuto
    Lorenzo Sciuto over 2 years
    the problem is it gives back only the message of the exception and not the stacktrace