Progress measuring with python's multiprocessing Pool and map function
To show the progress, replace pool.map
with pool.imap_unordered
:
from tqdm import tqdm # $ pip install tqdm
for result in tqdm(pool.imap_unordered(init_worker, csvReader, chunksize=10)):
csvWriter.writerow(result)
tqdm
part is optional, see Text Progress Bar in the Console
Accidentally, it fixes your "whole csv is stored in memory" and "KeyboardInterrupt is not raised" problems.
Here's a complete code example:
#!/usr/bin/env python
import itertools
import logging
import multiprocessing
import time
def compute(i):
time.sleep(.5)
return i**2
if __name__ == "__main__":
logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s",
datefmt="%F %T", level=logging.DEBUG)
pool = multiprocessing.Pool()
try:
for square in pool.imap_unordered(compute, itertools.count(), chunksize=10):
logging.debug(square) # report progress by printing the result
except KeyboardInterrupt:
logging.warning("got Ctrl+C")
finally:
pool.terminate()
pool.join()
You should see the output in batches every .5 * chunksize
seconds. If you press Ctrl+C
; you should see KeyboardInterrupt
raised in the child processes and in the main process. In Python 3, the main process exits immediately. In Python 2, the KeyboardInterrupt
is delayed until the next batch should have been printed (bug in Python).
Wakan Tanka
Enthusiastic and passionate for computers and technology, my workhorses: perl, bash, python, tcl/tk, R. LaTeX, Unix
Updated on June 04, 2022Comments
-
Wakan Tanka almost 2 years
Following code I'm using for parallel csv processing:
#!/usr/bin/env python import csv from time import sleep from multiprocessing import Pool from multiprocessing import cpu_count from multiprocessing import current_process from pprint import pprint as pp def init_worker(x): sleep(.5) print "(%s,%s)" % (x[0],x[1]) x.append(int(x[0])**2) return x def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1): # OPEN FH FOR READING INPUT FILE inputFH = open(inputFile, "rt") csvReader = csv.reader(inputFH, delimiter=separator) # SKIP HEADERS for skip in xrange(skipRows): csvReader.next() # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE try: p = Pool(processes = cpuCount) results = p.map(init_worker, csvReader, chunksize = 10) p.close() p.join() except KeyboardInterrupt: p.close() p.join() p.terminate() # CLOSE FH FOR READING INPUT inputFH.close() # OPEN FH FOR WRITING OUTPUT FILE outputFH = open(outputFile, "wt") csvWriter = csv.writer(outputFH, lineterminator='\n') # WRITE HEADER TO OUTPUT FILE csvWriter.writerow(header) # WRITE RESULTS TO OUTPUT FILE [csvWriter.writerow(row) for row in results] # CLOSE FH FOR WRITING OUTPUT outputFH.close() print pp(results) # print len(results) def main(): inputFile = "input.csv" outputFile = "output.csv" parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count()) if __name__ == '__main__': main()
I would like to somehow measure the progress of the script (just plain text not any fancy ASCII art). The one option that comes to my mind is to compare the lines that were successfully processed by
init_worker
to all lines in input.csv, and print the actual state e.g. every second, can you please point me to right solution? I've found several articles with similar problematic but I was not able to adapt it to my needs because neither used thePool
class andmap
method. I would also like to ask aboutp.close(), p.join(), p.terminate()
methods, I've seen them mainly withProcess
notPool
class, are they necessary withPool
class and have I use them correctly? Using ofp.terminate()
was intended to kill the process with ctrl+c but this is different story which has not an happy end yet. Thank you.PS: My input.csv looks like this, if it matters:
0,0 1,3 2,6 3,9 ... ... 48,144 49,147
PPS: as I said I'm newbie in
multiprocessing
and the code I've put together just works. The one drawback I can see is that whole csv is stored in memory, so if you guys have better idea do not hesitate to share it.Edit
in reply to @J.F.Sebastian
Here is my actual code based on your suggestions:
#!/usr/bin/env python import csv from time import sleep from multiprocessing import Pool from multiprocessing import cpu_count from multiprocessing import current_process from pprint import pprint as pp from tqdm import tqdm def do_job(x): sleep(.5) # print "(%s,%s)" % (x[0],x[1]) x.append(int(x[0])**2) return x def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1): # OPEN FH FOR READING INPUT FILE inputFH = open(inputFile, "rb") csvReader = csv.reader(inputFH, delimiter=separator) # SKIP HEADERS for skip in xrange(skipRows): csvReader.next() # OPEN FH FOR WRITING OUTPUT FILE outputFH = open(outputFile, "wt") csvWriter = csv.writer(outputFH, lineterminator='\n') # WRITE HEADER TO OUTPUT FILE csvWriter.writerow(header) # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE try: p = Pool(processes = cpuCount) # results = p.map(do_job, csvReader, chunksize = 10) for result in tqdm(p.imap_unordered(do_job, csvReader, chunksize=10)): csvWriter.writerow(result) p.close() p.join() except KeyboardInterrupt: p.close() p.join() # CLOSE FH FOR READING INPUT inputFH.close() # CLOSE FH FOR WRITING OUTPUT outputFH.close() print pp(result) # print len(result) def main(): inputFile = "input.csv" outputFile = "output.csv" parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count()) if __name__ == '__main__': main()
Here is output of
tqdm
:1 [elapsed: 00:05, 0.20 iters/sec]
what does this output mean? On the page you've referred
tqdm
is used in loop following way:>>> import time >>> from tqdm import tqdm >>> for i in tqdm(range(100)): ... time.sleep(1) ... |###-------| 35/100 35% [elapsed: 00:35 left: 01:05, 1.00 iters/sec]
This output makes sense, but what does my output mean? Also it does not seems that ctrl+c problem is fixed: after hitting ctrl+c script throws some Traceback, if I hit ctrl+c again then I get new Traceback and so on. The only way to kill it is sending it to background (ctr+z) and then kill it (kill %1)