How to create global lock/semaphore with multiprocessing.pool in Python?

23,029

Solution 1

Use the initializer and initargs arguments when creating a pool so as to define a global in all the child processes.

For instance:

from multiprocessing import Pool, Lock
from time import sleep

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def init_child(lock_):
    global lock
    lock = lock_

def main():
    lock = Lock()
    poolsize = 4
    with Pool(poolsize, initializer=init_child, initargs=(lock,)) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()

This code will print out the numbers 0-3 in ascending order (the order in which the jobs were submitted), because it uses the lock. Comment out the with lock: line to see it print out the numbers in descending order.

This solution works both on windows and unix. However, because processes can fork on unix systems, unix only need to declare global variables at the module scope. The child process gets a copy of the parent's memory, which includes the lock object which still works. Thus the initializer isn't strictly needed, but it can help document how the code is intended to work. When multiprocessing is able to create processes by forking, then the following also works.

from multiprocessing import Pool, Lock
from time import sleep

lock = Lock()

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def main():
    poolsize = 4
    with Pool(poolsize) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()

Solution 2

Use a global semaphore and aquire it if you are accessing a resource. For example:

import multiprocessing
from time import sleep

semaphore = multiprocessing.Semaphore(2)

def do_job(id):
    with semaphore:
        sleep(1)
    print("Finished job")

def main():
    pool = multiprocessing.Pool(6)
    for job_id in range(6):
        print("Starting job")
        pool.apply_async(do_job, [job_id])
    pool.close()
    pool.join()

if __name__ == "__main__":
    main()

This program finishes only two jobs every second because the other threads are waiting for the semaphore.

Share:
23,029
Chameleon
Author by

Chameleon

Product Developer Project Manager Large Scale Architect User Requirements Analytic Object Oriented Programmer Automation and Electrical Engineer Feel the sense of humor ...

Updated on July 15, 2022

Comments

  • Chameleon
    Chameleon almost 2 years

    I want limit resource access in children processes. For example - limit http downloads, disk io, etc.. How can I achieve it expanding this basic code?

    Please share some basic code examples.

    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    while job_queue.is_jobs_for_processing():
      for job in job_queue.pull_jobs_for_processing:
        pool.apply_async(do_job, callback = callback)
    pool.close()
    pool.join()