Python multiprocessing and handling exceptions in workers
13,954
I changed your code slightly to make it work (see explanation below).
import multiprocessing as mp
import random
workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.5
fail_job_p = 0.4
#========= Worker =========
def do_work(job_state, arg):
if random.random() < fail_job_p:
raise Exception("Job failed")
return "job %d processed %d" % (job_state, arg)
def init(args):
if random.random() < fail_init_p:
raise Exception("Worker init failed")
return args
def worker_function(args, jobs_queue, result_queue):
# INIT
# What to do when init() fails?
try:
state = init(args)
except:
print "!Worker %d init fail" % args
result_queue.put('init failed')
return
# DO WORK
# Process data in the jobs queue
for job in iter(jobs_queue.get, None):
try:
# Can throw an exception!
result = do_work(state, job)
result_queue.put(result)
except:
print "!Job %d failed, skip..." % job
result_queue.put('job failed')
#========= Parent =========
jobs = mp.Queue()
results = mp.Queue()
for i in range(workers_count):
mp.Process(target=worker_function, args=(i, jobs, results)).start()
# Populate jobs queue
results_to_expect = 0
for j in range(30):
jobs.put(j)
results_to_expect += 1
init_failures = 0
job_failures = 0
successes = 0
while job_failures + successes < 30 and init_failures < workers_count:
result = results.get()
init_failures += int(result == 'init failed')
job_failures += int(result == 'job failed')
successes += int(result != 'init failed' and result != 'job failed')
#print init_failures, job_failures, successes
for ii in range(workers_count):
jobs.put(None)
My changes:
- Changed
jobs
to be just a normalQueue
(instead ofJoinableQueue
). - Workers now communicate back special results strings "init failed" and "job failed".
- The master process monitors for the said special results so long as specific conditions are in effect.
- In the end, put "stop" requests (i.e.
None
jobs) for however many workers you have, regardless. Note that not all of these may be pulled from the queue (in case the worker failed to initalize).
By the way, your original code was nice and easy to work with. The random probabilities bit is pretty cool.
Related videos on Youtube
Author by
Peter Popov
Updated on June 04, 2022Comments
-
Peter Popov over 1 year
I use python multiprocessing library for an algorithm in which I have many workers processing certain data and returning result to the parent process. I use multiprocessing.Queue for passing jobs to workers, and second to collect results.
It all works pretty well, until worker fails to process some chunk of data. In the simplified example below each worker has two phases:
- initialization - can fail, in this case worker should be destroyed
- data processing - processing a chunk of data can fail, in this case worker should skip this chunk and continue with next data.
When either of this phases fails I get a deadlock after script completion. This code simulates my problem:
import multiprocessing as mp import random workers_count = 5 # Probability of failure, change to simulate failures fail_init_p = 0.2 fail_job_p = 0.3 #========= Worker ========= def do_work(job_state, arg): if random.random() < fail_job_p: raise Exception("Job failed") return "job %d processed %d" % (job_state, arg) def init(args): if random.random() < fail_init_p: raise Exception("Worker init failed") return args def worker_function(args, jobs_queue, result_queue): # INIT # What to do when init() fails? try: state = init(args) except: print "!Worker %d init fail" % args return # DO WORK # Process data in the jobs queue for job in iter(jobs_queue.get, None): try: # Can throw an exception! result = do_work(state, job) result_queue.put(result) except: print "!Job %d failed, skip..." % job finally: jobs_queue.task_done() # Telling that we are done with processing stop token jobs_queue.task_done() #========= Parent ========= jobs = mp.JoinableQueue() results = mp.Queue() for i in range(workers_count): mp.Process(target=worker_function, args=(i, jobs, results)).start() # Populate jobs queue results_to_expect = 0 for j in range(30): jobs.put(j) results_to_expect += 1 # Collecting the results # What if some workers failed to process the job and we have # less results than expected for r in range(results_to_expect): result = results.get() print result #Signal all workers to finish for i in range(workers_count): jobs.put(None) #Wait for them to finish jobs.join()
I have two question about this code:
- When
init()
fails, how to detect that worker is invalid and not to wait for it to finish? - When
do_work()
fails, how to notify parent process that less results should be expected in the results queue?
Thank you for help!
-
jfs over 10 yearsor you could put a tuple
(result, error)
(error is None on success) into the result queue to avoid in-band communication for errors.