Dead simple example of using Multiprocessing Queue, Pool and Locking

208,721

Solution 1

The best solution for your problem is to utilize a Pool. Using Queues and having a separate "queue feeding" functionality is probably overkill.

Here's a slightly rearranged version of your program, this time with only 2 processes coralled in a Pool. I believe it's the easiest way to go, with minimal changes to original code:

import multiprocessing
import time

data = (
    ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
    ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker((inputs, the_time)):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

def mp_handler():
    p = multiprocessing.Pool(2)
    p.map(mp_worker, data)

if __name__ == '__main__':
    mp_handler()

Note that mp_worker() function now accepts a single argument (a tuple of the two previous arguments) because the map() function chunks up your input data into sublists, each sublist given as a single argument to your worker function.

Output:

Processs a  Waiting 2 seconds
Processs b  Waiting 4 seconds
Process a   DONE
Processs c  Waiting 6 seconds
Process b   DONE
Processs d  Waiting 8 seconds
Process c   DONE
Processs e  Waiting 1 seconds
Process e   DONE
Processs f  Waiting 3 seconds
Process d   DONE
Processs g  Waiting 5 seconds
Process f   DONE
Processs h  Waiting 7 seconds
Process g   DONE
Process h   DONE

Edit as per @Thales comment below:

If you want "a lock for each pool limit" so that your processes run in tandem pairs, ala:

A waiting B waiting | A done , B done | C waiting , D waiting | C done, D done | ...

then change the handler function to launch pools (of 2 processes) for each pair of data:

def mp_handler():
    subdata = zip(data[0::2], data[1::2])
    for task1, task2 in subdata:
        p = multiprocessing.Pool(2)
        p.map(mp_worker, (task1, task2))

Now your output is:

 Processs a Waiting 2 seconds
 Processs b Waiting 4 seconds
 Process a  DONE
 Process b  DONE
 Processs c Waiting 6 seconds
 Processs d Waiting 8 seconds
 Process c  DONE
 Process d  DONE
 Processs e Waiting 1 seconds
 Processs f Waiting 3 seconds
 Process e  DONE
 Process f  DONE
 Processs g Waiting 5 seconds
 Processs h Waiting 7 seconds
 Process g  DONE
 Process h  DONE

Solution 2

Here is my personal goto for this topic:

Gist here, (pull requests welcome!): https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec

import multiprocessing
import sys

THREADS = 3

# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()


def func_worker(args):
    """This function will be called by each thread.
    This function can not be a class method.
    """
    # Expand list of args into named args.
    str1, str2 = args
    del args

    # Work
    # ...



    # Serial-only Portion
    GLOBALLOCK.acquire()
    print(str1)
    print(str2)
    GLOBALLOCK.release()


def main(argp=None):
    """Multiprocessing Spawn Example
    """
    # Create the number of threads you want
    pool = multiprocessing.Pool(THREADS)

    # Define two jobs, each with two args.
    func_args = [
        ('Hello', 'World',), 
        ('Goodbye', 'World',), 
    ]


    try:
        pool.map_async(func_worker, func_args).get()
    except KeyboardInterrupt:
        # Allow ^C to interrupt from any thread.
        sys.stdout.write('\033[0m')
        sys.stdout.write('User Interupt\n')
    pool.close()

if __name__ == '__main__':
    main()

Solution 3

This might be not 100% related to the question, but on my search for an example of using multiprocessing with a queue this shows up first on google.

This is a basic example class that you can instantiate and put items in a queue and can wait until queue is finished. That's all I needed.

from multiprocessing import JoinableQueue
from multiprocessing.context import Process


class Renderer:
    queue = None

    def __init__(self, nb_workers=2):
        self.queue = JoinableQueue()
        self.processes = [Process(target=self.upload) for i in range(nb_workers)]
        for p in self.processes:
            p.start()

    def render(self, item):
        self.queue.put(item)

    def upload(self):
        while True:
            item = self.queue.get()
            if item is None:
                break

            # process your item here

            self.queue.task_done()

    def terminate(self):
        """ wait until queue is empty and terminate processes """
        self.queue.join()
        for p in self.processes:
            p.terminate()

r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()

Solution 4

For everyone using editors like Komodo Edit (win10) add sys.stdout.flush() to:

def mp_worker((inputs, the_time)):
    print " Process %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs
    sys.stdout.flush()

or as first line to:

    if __name__ == '__main__':
       sys.stdout.flush()

This helps to see what goes on during the run of the script; in stead of having to look at the black command line box.

Solution 5

Here is an example from my code (for threaded pool, but just change class name and you'll have process pool):

def execute_run(rp): 
   ... do something 

pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
    for en in TESTED_ENERGIES:
        for ecut in TESTED_E_CUT:
            rp = RunParams(
                simulations, DEST_DIR,
                PARTICLE, mat, 960, 0.125, ecut, en
            )
            pool.submit(execute_run, rp)
pool.join()

Basically:

  • pool = ThreadPoolExecutor(6) creates a pool for 6 threads
  • Then you have bunch of for's that add tasks to the pool
  • pool.submit(execute_run, rp) adds a task to pool, first arogument is a function called in in a thread/process, rest of the arguments are passed to the called function.
  • pool.join waits until all tasks are done.
Share:
208,721

Related videos on Youtube

thclpr
Author by

thclpr

Updated on August 04, 2022

Comments

  • thclpr
    thclpr over 1 year

    I tried to read the documentation at http://docs.python.org/dev/library/multiprocessing.html but I'm still struggling with multiprocessing Queue, Pool and Locking. And for now I was able to build the example below.

    Regarding Queue and Pool, I'm not sure if I understood the concept in the right way, so correct me if I'm wrong. What I'm trying to achieve is to process 2 requests at time ( data list have 8 in this example ) so, what should I use? Pool to create 2 processes that can handle two different queues ( 2 at max ) or should I just use Queue to process 2 inputs each time? The lock would be to print the outputs correctly.

    import multiprocessing
    import time
    
    data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
            ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
    )
    
    
    def mp_handler(var1):
        for indata in var1:
            p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
            p.start()
    
    
    def mp_worker(inputs, the_time):
        print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
        time.sleep(int(the_time))
        print " Process %s\tDONE" % inputs
    
    if __name__ == '__main__':
        mp_handler(data)
    
  • Tim Peters
    Tim Peters over 10 years
    Note that you're using concurrent.futures, but the OP is asking about multiprocessing and Python 2.7.
  • thclpr
    thclpr over 10 years
    Thanks for the simple and direct example of how to do it, But how could i apply the lock for each pool limit? I mean, if you execute the code, i would like to see something like " A waiting B waiting | A done , b done | C waiting , D waiting | C done, D done "
  • Velimir Mlaker
    Velimir Mlaker over 10 years
    In other words, you don't want C starting until both A and B are done?
  • thclpr
    thclpr over 10 years
    Exactly, i can do it using multiprocessing.Process but i can't figure out how to do it using pool
  • thclpr
    thclpr over 10 years
    Thank you a lot, work as intended, but on function mp_handler you are referencing the variable data instead of var1 :)
  • Velimir Mlaker
    Velimir Mlaker over 10 years
    Okay thanks, I removed var1 altogether, referring to global data instead.
  • thclpr
    thclpr over 10 years
    Hi, just for curiosity, your example works for pairs of 2 , but for example, if i have a list with 3 or 77 itens on it? What could be done to instead limiting the values on the list using subdata = zip(var1[0::2], var1[1::2]), the functiou just could go out of pairs defined on what i have on p = multiprocessing.Pool(2) ? I mean, suposing i have a list with 107 values, or 10009 , etc... And i only wanted to increase or decrease the pool to process the list?
  • ThorSummoner
    ThorSummoner over 7 years
    I'm not exactly sure is .map_async() is better than .map() in any way.
  • Zelphir Kaltstahl
    Zelphir Kaltstahl over 7 years
    What are item1 and item2? Are they some kind of task or functions, which will be executed in two different processes?
  • linqu
    linqu over 7 years
    yes they are tasks or input parameter that get processed in a parallel way.
  • mata
    mata about 7 years
    The argument to get() is a timeout, is has nothing to do with the number of jobs that are started.
  • ThorSummoner
    ThorSummoner about 7 years
    @mata so, is that meant to be used in a polling loop? .get(timeout=1)? and is it okay to just say .get() to get the completed list?
  • mata
    mata about 7 years
    Yes, .get() waits indefinitely until all results are available and returns the list of results. You can use a polling loop to check weather results are available, or you can pass a callback function in the map_async() call which will then be invoked for every result once it becomes available.
  • Amir
    Amir about 6 years
    @VelimirMlaker I want to run a function in the background but I have some resource limitations and cannot run the function as many times that I want and want to queue the extra executions of the function. Do you have any idea on how I should do that? I have my question here. Could you please take a look at my question and see if you can give me some hints (or even better, an answer) on how I should do that?