subprocess + multiprocessing - multiple commands in sequence

16,646

Are the calls to Popen "blocking"?

No. Just creating a subprocess.Popen returns immediately, giving you an object that you could wait on or otherwise use. If you want to block, that's simple:

subprocess.check_call(shlex.split(cmd1))

Meanwhile, I'm not sure why you're putting your args together into a string and then trying to shlex them back to a list. Why not just write the list?

cmd1 = ["echo", fn]
subprocess.check_call(cmd1)

While this works, it does not seem to be running multiple processes; it seems like it's just running in serial

What makes you think this? Given that each process just kicks off two processes into the background as fast as possible, it's going to be pretty hard to tell whether they're running in parallel.

If you want to verify that you're getting work from multiple processing, you may want to add some prints or logging (and throw something like os.getpid() into the messages).

Meanwhile, it looks like you're trying to exactly duplicate the effects of multiprocessing.Pool.map_async out of a loop around multiprocessing.Pool.apply_async, except that instead of accumulating the results you're stashing each one in a variable called RESULT and then throwing it away before you can use it. Why not just use map_async?

Finally, you asked whether multiprocessing is the right tool for the job. Well, you clearly need something asynchronous: check_call(args(file1)) has to block other_python_function_to_do_something_to_file(file1), but at the same time not block check_call(args(file2)).

I would probably have used threading, but really, it doesn't make much difference. Even if you're on a platform where process startup is expensive, you're already paying that cost because the whole point is running N * M bunch of child processes, so another pool of 8 isn't going to hurt anything. And there's little risk of either accidentally creating races by sharing data between threads, or accidentally creating code that looks like it shares data between processes that doesn't, since there's nothing to share. So, whichever one you like more, go for it.

The other alternative would be to write an event loop. Which I might actually start doing myself for this problem, but I'd regret it, and you shouldn't do it…

Share:
16,646
keflavich
Author by

keflavich

I'm an astrophysicist using primarily python.

Updated on August 01, 2022

Comments

  • keflavich
    keflavich over 1 year

    I have a set of command line tools that I'd like to run in parallel on a series of files. I've written a python function to wrap them that looks something like this:

    def process_file(fn):
        print os.getpid()
        cmd1 = "echo "+fn
        p = subprocess.Popen(shlex.split(cmd1))
    
        # after cmd1 finishes
        other_python_function_to_do_something_to_file(fn)
    
        cmd2 = "echo "+fn
        p = subprocess.Popen(shlex.split(cmd2))
        print "finish"
    
    if __name__=="__main__":
        import multiprocessing
        p = multiprocessing.Pool()
        for fn in files:
            RETURN = p.apply_async(process_file,args=(fn,),kwds={some_kwds})
    

    While this works, it does not seem to be running multiple processes; it seems like it's just running in serial (I've tried using Pool(5) with the same result). What am I missing? Are the calls to Popen "blocking"?

    EDIT: Clarified a little. I need cmd1, then some python command, then cmd2, to execute in sequence on each file.

    EDIT2: The output from the above has the pattern:

    pid
    finish
    pid
    finish
    pid
    finish
    

    whereas a similar call, using map in place of apply (but without any provision for passing kwds) looks more like

    pid
    pid
    pid
    finish
    finish
    finish
    

    However, the map call sometimes (always?) hangs after apparently succeeding

  • keflavich
    keflavich over 11 years
    OK, so if I want my shell commands to run in series, check_call is the way to go. Does it then make sense to use multiprocessing? (re:shlex - my actual commands are more complex than the simple example I included)
  • abarnert
    abarnert over 11 years
    Now that I understand the problem, yes, multiprocessing is at least an option. You want to block on the first process for the first file before calling the second process for the first file… but not before calling the first process on the second file. I'd probably do it with either threading (or I might even write a select loop, and then beat my head against the wall when it came time to port to Windows…), but multiprocessing will work fine.
  • keflavich
    keflavich over 11 years
    Great, thanks, this makes a lot of sense. I used apply instead of map because I need to pass keyword arguments to the function. Would it make more sense to wrap or decorate my function with a kwarg parser and then use map_async?
  • keflavich
    keflavich over 11 years
    re: Why do I think they're not separate processes? Because I get my output results in serial order. i.e., my inputs are f1,f2,f3,f4,...,f30, I run with 4 processes, and every one outputs in order sequentially with the same time separation between each; if they were running in parallel the order would be random. BUT, they do have different pid's. Hmm.
  • abarnert
    abarnert over 11 years
    Another way to verify multiple processes is to throw in random delays at various places (both in the tasks, and between the applies) to increase the variation. Often this can be enough to drive it from "every 1000 times I get one f out of order" to "on average I get 17 fs out of order"…
  • keflavich
    keflavich over 11 years
    If I use Pool().map, 10 processes start & finish nearly simultaneously - there's a clear difference. See EDIT 2.
  • abarnert
    abarnert over 11 years
    Sorry, I didn't answer your other question, about the kwargs. The map method only works for functions of one argument, which means yes, you do need to wrap things up in, e.g., a tuple of a list and a dict, and pass a wrapper equivalent to lambda arg: realFunc(*arg[0], **arg[1]) in place of realFunc. Which may be adding more work than you're saving in the dispatch, I said in the comments to the question.