Python multiple subprocess with a pool/queue recover output as soon as one finishes and launch next job in queue

19,496

ThreadPool could be a good fit for your problem, you set the number of worker threads and add jobs, and the threads will work their way through all the tasks.

from multiprocessing.pool import ThreadPool
import subprocess


def work(sample):
    my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        #here I parse stdout..


num = None  # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
for sample in all_samples:
    tp.apply_async(work, (sample,))

tp.close()
tp.join()
Share:
19,496
gmarco
Author by

gmarco

Updated on June 05, 2022

Comments

  • gmarco
    gmarco almost 2 years

    I'm currently launching a subprocess and parsing stdout on the go without waiting for it to finish to parse stdout.

    for sample in all_samples:
        my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
        line = True
        while line:
            myline = my_tool_subprocess.stdout.readline()
            #here I parse stdout..
    

    In my script I perform this action multiple times, indeed depending on the number of input samples.

    Main problem here is that every subprocess is a program/tool that uses 1 CPU for 100% while it's running. And it takes sometime.. maybe 20-40 min per input.

    What I would like to achieve, is to set a pool, queue (I'm not sure what's the exact terminology here) of N max subprocess job process running at same time. So I could maximize performance, and not proceed sequentially.

    So an execution flow for example a max 4 jobs pool should be:

    • Launch 4 subprocess.
    • When one of jobs finishes, parse stdout and launch next.
    • Do this until all the jobs in queue are finished.

    If I can achieve this I really don't know how I could identify which sample subprocess is the one that has finished. At this moment, I don't need to identify them since each subprocess runs sequentially and I parse stdout as subprocess is printing stdout.

    This is really important, since I need to identify the output of each subprocess and assign it to it's corresponding input/sample.

  • gmarco
    gmarco over 9 years
    Not really. I mean each process input produces an output. I don't previous process output to compute the next one. That's why I wish I could run them in parallel. The problem is that at this moment, I run them one by one in a sequential way.
  • GP89
    GP89 over 9 years
    There will be a benefit, as the subprocesses are in separate processes (the python threads are just waiting for output)
  • gmarco
    gmarco over 9 years
    Wow! This seems to be exactly what I needed. If in a future the tool I use in work() can benefit from multiple core so I could use 2 CPUs(cores) per created thread is there any way to control this to not exceed computer number of cores? (ie: I spawn 4 threads each one using 4 cores when my machines has only 8 cores.)
  • GP89
    GP89 over 9 years
    @gmarco This can't tell anything about the type of work, or how many cores it uses/needs. If you don't specify the number of threads to ThreadPool (the first argument) it uses the function multiprocessing.cpu_count() to get the cpu count, and spins up that many threads. So if you know how many cores each job needs, you can work this out yourself (ThreadPool(multiprocessing.cpu_count() / 2) for example)
  • Íhor Mé
    Íhor Mé about 4 years
    Thanks! I had some serious issues with subprocess. Popen just not launching and generally not working for me for some reason and I just switched to ThreadPool + os.system() and it's even better than Popen was supposed to be!
  • GP89
    GP89 about 4 years
    @ÍhorMé You're better off using subprocess. but you can deadlock it if there's too much output, and you're not handling stdout/err properly
  • Íhor Mé
    Íhor Mé about 4 years
    subprocess didn't spawn threads for some reason for me. I found no resolution to that problem. It's probably some bug. (Not on my side)
  • GP89
    GP89 about 4 years
    @ÍhorMé subprocess doesn't spawn threads, it's for starting new external processes
  • Rob
    Rob over 3 years
    FWIW, you also have to 'break' out of the while loop as soon as you finish parsing. Otherwise, queue won't be released to allow next job enter the queue.
  • GP89
    GP89 over 3 years
    @Rob it should probably be line = here in the loop, then the while loop would automatically break and the job return when the process has finished returning output
  • G M
    G M over 2 years
    How can this break the while loop???
  • GP89
    GP89 over 2 years
    @GM you're right it wont, I just copied the code from the question