Python: Writing to a single file with queue while using multiprocessing Pool
Solution 1
Multiprocessing pools implement a queue for you. Just use a pool method that returns the worker return value to the caller. imap works well:
import multiprocessing
import re
def mp_worker(filename):
with open(filename) as f:
text = f.read()
m = re.findall("x+", text)
count = len(max(m, key=len))
return filename, count
def mp_handler():
p = multiprocessing.Pool(32)
with open('infilenamess.txt') as f:
filenames = [line for line in (l.strip() for l in f) if line]
with open('results.txt', 'w') as f:
for result in p.imap(mp_worker, filenames):
# (filename, count) tuples from worker
f.write('%s: %d\n' % result)
if __name__=='__main__':
mp_handler()
Solution 2
I took the accepted answer and simplified it for my own understanding of how this works. I am posting it here in case it helps someone else.
import multiprocessing
def mp_worker(number):
number += 1
return number
def mp_handler():
p = multiprocessing.Pool(32)
numbers = list(range(1000))
with open('results.txt', 'w') as f:
for result in p.imap(mp_worker, numbers):
f.write('%d\n' % result)
if __name__=='__main__':
mp_handler()
Solution 3
Here's my approach using a multiprocessing Manager object. The nice thing about this approach is that when processing drops out of the manager with block in the run_multi() function, the filewriter queue is automatically closed making code very easy to read and you have no hassle trying to stop listening on the queue.
from functools import partial
from multiprocessing import Manager, Pool, Queue
from random import randint
import time
def run_multi():
input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with Manager() as manager:
pool = Pool() # By default pool will size depending on cores available
message_queue = manager.Queue() # Queue for sending messages to file writer listener
pool.apply_async(file_writer, (message_queue, )) # Start file listener ahead of doing the work
pool.map(partial(worker, message_queue=message_queue), input) # Partial function allows us to use map to divide workload
def worker(input: int, message_queue: Queue):
message_queue.put(input * 10)
time.sleep(randint(1, 5)) # Simulate hard work
def file_writer(message_queue: Queue):
with open("demo.txt", "a") as report:
while True:
report.write(f"Value is: {message_queue.get()}\n")
if __name__ == "__main__":
run_multi()
Related videos on Youtube
risraelsen
Updated on May 19, 2020Comments
-
risraelsen almost 4 years
I have hundreds of thousands of text files that I want to parse in various ways. I want to save the output to a single file without synchronization problems. I have been using multiprocessing pool to do this to save time, but I can't figure out how to combine Pool and Queue.
The following code will save the infile name as well as the maximum number of consecutive "x"s in the file. However, I want all processes to save results to the same file, and not to different files as in my example. Any help on this would be greatly appreciated.
import multiprocessing with open('infilenamess.txt') as f: filenames = f.read().splitlines() def mp_worker(filename): with open(filename, 'r') as f: text=f.read() m=re.findall("x+", text) count=len(max(m, key=len)) outfile=open(filename+'_results.txt', 'a') outfile.write(str(filename)+'|'+str(count)+'\n') outfile.close() def mp_handler(): p = multiprocessing.Pool(32) p.map(mp_worker, filenames) if __name__ == '__main__': mp_handler()
-
risraelsen over 9 yearsSo, I loop through the results one at a time and write them to the file as they come in? Does that mean that the new worker won't start until each "result" has been written, or will 32 run at a time, but will wait to write? Also, can you explain why you replaced my f.read().splitlines() with [line for line in (l.strip() for l in f) if line]?
-
tdelaney over 9 yearsThe 32 processes run in the background and get more filenames in "chunks" as they pass results back to the parent process. Results are passed back immediately so the parent is doing its work in parallel. Its a bit more efficient to read the file line by line than to read the whole thing and split it later... that's what the list is for.
-
Menezes Sousa about 9 yearsExcellent! There are tons of answers about this online but none so simple. Kudos to you!
-
Reihan_amn almost 6 yearsThanks a lot! This is not completely clear for me that when and where the pool is created and when/where it gets drained to the file. my whole code is this: Parallel(n_jobs=num_cores)(delayed(my_function)(entry) for entry in contents) it takes one line and returns 30 lines. Do I have to store the result of all process into one list and write into the file? if yes, it is gonna be so slow because when we eventually store our results into the list and it grows and the process becomes so slow!
-
Reihan_amn almost 6 yearsThanks a lot! This is not completely clear for me that when and where the pool is created and when/where it gets drained to the file. my whole code is this: Parallel(n_jobs=num_cores)(delayed(my_function)(entry) for entry in contents) it takes one line and returns 30 lines. Do I have to store the result of all process into one list and write into the file? if yes, it is gonna be so slow because when we eventually store our results into the list and it grows and the process becomes so slow!
-
zyxue almost 6 yearsIf the order of the returned results doesn't map, you could also use
imap_unordered
. -
Jason Goal about 5 yearsThis is a great answer. Saved lots of time, thanks. I encountered a problem while applying this to my data. I have a 5GB
csv
, I use 42 cores to process it and write the result to a singlecsv
file, however, there always a messed at row 147, I can replicate this using different sub data sets, but same error always happens at that position.real outputcad1d1eaf41c40f89a0198c3be80379f,2018-07-6195d4a2c0914f4381442f08797f658f,2018-06-15 01:47:34,1
, desired outputcad1d1eaf41c40f89a0198c3be80379f,2018-07-30 01:47:34(\n)16195d4a2c0914f4381442f08797f658f,2018-06-15 01:47:34,1