Progress measuring with python's multiprocessing Pool and map function

11,913

To show the progress, replace pool.map with pool.imap_unordered:

from tqdm import tqdm # $ pip install tqdm

for result in tqdm(pool.imap_unordered(init_worker, csvReader, chunksize=10)):
    csvWriter.writerow(result)

tqdm part is optional, see Text Progress Bar in the Console

Accidentally, it fixes your "whole csv is stored in memory" and "KeyboardInterrupt is not raised" problems.

Here's a complete code example:

#!/usr/bin/env python
import itertools
import logging
import multiprocessing
import time

def compute(i):
    time.sleep(.5)
    return i**2

if __name__ == "__main__":
    logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s",
                        datefmt="%F %T", level=logging.DEBUG)
    pool = multiprocessing.Pool()
    try:
        for square in pool.imap_unordered(compute, itertools.count(), chunksize=10):
            logging.debug(square) # report progress by printing the result
    except KeyboardInterrupt:
        logging.warning("got Ctrl+C")
    finally:
        pool.terminate()
        pool.join()

You should see the output in batches every .5 * chunksize seconds. If you press Ctrl+C; you should see KeyboardInterrupt raised in the child processes and in the main process. In Python 3, the main process exits immediately. In Python 2, the KeyboardInterrupt is delayed until the next batch should have been printed (bug in Python).

Share:
11,913
Wakan Tanka
Author by

Wakan Tanka

Enthusiastic and passionate for computers and technology, my workhorses: perl, bash, python, tcl/tk, R. LaTeX, Unix

Updated on June 04, 2022

Comments

  • Wakan Tanka
    Wakan Tanka almost 2 years

    Following code I'm using for parallel csv processing:

    #!/usr/bin/env python
    
    import csv
    from time import sleep
    from multiprocessing import Pool
    from multiprocessing import cpu_count
    from multiprocessing import current_process
    from pprint import pprint as pp
    
    def init_worker(x):
      sleep(.5)
      print "(%s,%s)" % (x[0],x[1])
      x.append(int(x[0])**2)
      return x
    
    def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
      # OPEN FH FOR READING INPUT FILE
      inputFH   = open(inputFile,  "rt")
      csvReader = csv.reader(inputFH, delimiter=separator)
    
      # SKIP HEADERS
      for skip in xrange(skipRows):
        csvReader.next()
    
      # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
      try:
        p = Pool(processes = cpuCount)
        results = p.map(init_worker, csvReader, chunksize = 10)
        p.close()
        p.join()
      except KeyboardInterrupt:
        p.close()
        p.join()
        p.terminate()
    
      # CLOSE FH FOR READING INPUT
      inputFH.close()
    
      # OPEN FH FOR WRITING OUTPUT FILE
      outputFH  = open(outputFile, "wt")
      csvWriter = csv.writer(outputFH, lineterminator='\n')
    
      # WRITE HEADER TO OUTPUT FILE
      csvWriter.writerow(header)
    
      # WRITE RESULTS TO OUTPUT FILE
      [csvWriter.writerow(row) for row in results]
    
      # CLOSE FH FOR WRITING OUTPUT
      outputFH.close()
    
      print pp(results)
      # print len(results)
    
    def main():
      inputFile  = "input.csv"
      outputFile = "output.csv"
      parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())
    
    if __name__ == '__main__':
      main()
    

    I would like to somehow measure the progress of the script (just plain text not any fancy ASCII art). The one option that comes to my mind is to compare the lines that were successfully processed by init_worker to all lines in input.csv, and print the actual state e.g. every second, can you please point me to right solution? I've found several articles with similar problematic but I was not able to adapt it to my needs because neither used the Pool class and map method. I would also like to ask about p.close(), p.join(), p.terminate() methods, I've seen them mainly with Process not Pool class, are they necessary with Pool class and have I use them correctly? Using of p.terminate() was intended to kill the process with ctrl+c but this is different story which has not an happy end yet. Thank you.

    PS: My input.csv looks like this, if it matters:

    0,0
    1,3
    2,6
    3,9
    ...
    ...
    48,144
    49,147
    

    PPS: as I said I'm newbie in multiprocessing and the code I've put together just works. The one drawback I can see is that whole csv is stored in memory, so if you guys have better idea do not hesitate to share it.

    Edit

    in reply to @J.F.Sebastian

    Here is my actual code based on your suggestions:

    #!/usr/bin/env python
    
    import csv
    from time import sleep
    from multiprocessing import Pool
    from multiprocessing import cpu_count
    from multiprocessing import current_process
    from pprint import pprint as pp
    from tqdm import tqdm
    
    def do_job(x):
      sleep(.5)
      # print "(%s,%s)" % (x[0],x[1])
      x.append(int(x[0])**2)
      return x
    
    def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
    
      # OPEN FH FOR READING INPUT FILE
      inputFH   = open(inputFile,  "rb")
      csvReader = csv.reader(inputFH, delimiter=separator)
    
      # SKIP HEADERS
      for skip in xrange(skipRows):
        csvReader.next()
    
      # OPEN FH FOR WRITING OUTPUT FILE
      outputFH  = open(outputFile, "wt")
      csvWriter = csv.writer(outputFH, lineterminator='\n')
    
      # WRITE HEADER TO OUTPUT FILE
      csvWriter.writerow(header)
    
      # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
      try:
        p = Pool(processes = cpuCount)
        # results = p.map(do_job, csvReader, chunksize = 10)
        for result in tqdm(p.imap_unordered(do_job, csvReader, chunksize=10)):
          csvWriter.writerow(result)
        p.close()
        p.join()
      except KeyboardInterrupt:
        p.close()
        p.join()
    
      # CLOSE FH FOR READING INPUT
      inputFH.close()
    
      # CLOSE FH FOR WRITING OUTPUT
      outputFH.close()
    
      print pp(result)
      # print len(result)
    
    def main():
      inputFile  = "input.csv"
      outputFile = "output.csv"
      parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())
    
    if __name__ == '__main__':
      main()
    

    Here is output of tqdm:

    1 [elapsed: 00:05,  0.20 iters/sec]
    

    what does this output mean? On the page you've referred tqdm is used in loop following way:

    >>> import time
    >>> from tqdm import tqdm
    >>> for i in tqdm(range(100)):
    ...     time.sleep(1)
    ... 
    |###-------| 35/100  35% [elapsed: 00:35 left: 01:05,  1.00 iters/sec]
    

    This output makes sense, but what does my output mean? Also it does not seems that ctrl+c problem is fixed: after hitting ctrl+c script throws some Traceback, if I hit ctrl+c again then I get new Traceback and so on. The only way to kill it is sending it to background (ctr+z) and then kill it (kill %1)