Python multiprocessing and handling exceptions in workers

13,954

I changed your code slightly to make it work (see explanation below).

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.5
fail_job_p = 0.4


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        result_queue.put('init failed')
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
            result_queue.put('job failed')


#========= Parent =========
jobs = mp.Queue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

init_failures = 0
job_failures = 0
successes = 0
while job_failures + successes < 30 and init_failures < workers_count:
    result = results.get()
    init_failures += int(result == 'init failed')
    job_failures += int(result == 'job failed')
    successes += int(result != 'init failed' and result != 'job failed')
    #print init_failures, job_failures, successes

for ii in range(workers_count):
    jobs.put(None)

My changes:

  1. Changed jobs to be just a normal Queue (instead of JoinableQueue).
  2. Workers now communicate back special results strings "init failed" and "job failed".
  3. The master process monitors for the said special results so long as specific conditions are in effect.
  4. In the end, put "stop" requests (i.e. None jobs) for however many workers you have, regardless. Note that not all of these may be pulled from the queue (in case the worker failed to initalize).

By the way, your original code was nice and easy to work with. The random probabilities bit is pretty cool.

Share:
13,954

Related videos on Youtube

Peter Popov
Author by

Peter Popov

Updated on June 04, 2022

Comments

  • Peter Popov
    Peter Popov over 1 year

    I use python multiprocessing library for an algorithm in which I have many workers processing certain data and returning result to the parent process. I use multiprocessing.Queue for passing jobs to workers, and second to collect results.

    It all works pretty well, until worker fails to process some chunk of data. In the simplified example below each worker has two phases:

    • initialization - can fail, in this case worker should be destroyed
    • data processing - processing a chunk of data can fail, in this case worker should skip this chunk and continue with next data.

    When either of this phases fails I get a deadlock after script completion. This code simulates my problem:

    import multiprocessing as mp
    import random
    
    workers_count = 5
    # Probability of failure, change to simulate failures
    fail_init_p = 0.2
    fail_job_p = 0.3
    
    
    #========= Worker =========
    def do_work(job_state, arg):
        if random.random() < fail_job_p:
            raise Exception("Job failed")
        return "job %d processed %d" % (job_state, arg)
    
    def init(args):
        if random.random() < fail_init_p:
            raise Exception("Worker init failed")
        return args
    
    def worker_function(args, jobs_queue, result_queue):
        # INIT
        # What to do when init() fails?
        try:
            state = init(args)
        except:
            print "!Worker %d init fail" % args
            return
        # DO WORK
        # Process data in the jobs queue
        for job in iter(jobs_queue.get, None):
            try:
                # Can throw an exception!
                result = do_work(state, job)
                result_queue.put(result)
            except:
                print "!Job %d failed, skip..." % job
            finally:
                jobs_queue.task_done()
        # Telling that we are done with processing stop token
        jobs_queue.task_done()
    
    
    
    #========= Parent =========
    jobs = mp.JoinableQueue()
    results = mp.Queue()
    for i in range(workers_count):
        mp.Process(target=worker_function, args=(i, jobs, results)).start()
    
    # Populate jobs queue
    results_to_expect = 0
    for j in range(30):
        jobs.put(j)
        results_to_expect += 1
    
    # Collecting the results
    # What if some workers failed to process the job and we have
    # less results than expected
    for r in range(results_to_expect):
        result = results.get()
        print result
    
    #Signal all workers to finish
    for i in range(workers_count):
        jobs.put(None)
    
    #Wait for them to finish
    jobs.join()
    

    I have two question about this code:

    1. When init() fails, how to detect that worker is invalid and not to wait for it to finish?
    2. When do_work() fails, how to notify parent process that less results should be expected in the results queue?

    Thank you for help!

  • jfs
    jfs over 10 years
    or you could put a tuple (result, error) (error is None on success) into the result queue to avoid in-band communication for errors.