Python: Writing to a single file with queue while using multiprocessing Pool

23,847

Solution 1

Multiprocessing pools implement a queue for you. Just use a pool method that returns the worker return value to the caller. imap works well:

import multiprocessing 
import re

def mp_worker(filename):
    with open(filename) as f:
        text = f.read()
    m = re.findall("x+", text)
    count = len(max(m, key=len))
    return filename, count

def mp_handler():
    p = multiprocessing.Pool(32)
    with open('infilenamess.txt') as f:
        filenames = [line for line in (l.strip() for l in f) if line]
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, filenames):
            # (filename, count) tuples from worker
            f.write('%s: %d\n' % result)

if __name__=='__main__':
    mp_handler()

Solution 2

I took the accepted answer and simplified it for my own understanding of how this works. I am posting it here in case it helps someone else.

import multiprocessing

def mp_worker(number):
    number += 1
    return number

def mp_handler():
    p = multiprocessing.Pool(32)
    numbers = list(range(1000))
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, numbers):
            f.write('%d\n' % result)

if __name__=='__main__':
    mp_handler()

Solution 3

Here's my approach using a multiprocessing Manager object. The nice thing about this approach is that when processing drops out of the manager with block in the run_multi() function, the filewriter queue is automatically closed making code very easy to read and you have no hassle trying to stop listening on the queue.

from functools import partial
from multiprocessing import Manager, Pool, Queue
from random import randint
import time

def run_multi():
    input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    with Manager() as manager:
        pool = Pool()  # By default pool will size depending on cores available
        message_queue = manager.Queue()  # Queue for sending messages to file writer listener
        pool.apply_async(file_writer, (message_queue, ))  # Start file listener ahead of doing the work
        pool.map(partial(worker, message_queue=message_queue), input)  # Partial function allows us to use map to divide workload

def worker(input: int, message_queue: Queue):
    message_queue.put(input * 10)
    time.sleep(randint(1, 5))  # Simulate hard work

def file_writer(message_queue: Queue):
    with open("demo.txt", "a") as report:
        while True:
            report.write(f"Value is: {message_queue.get()}\n")

if __name__ == "__main__":
    run_multi()
Share:
23,847

Related videos on Youtube

risraelsen
Author by

risraelsen

Updated on May 19, 2020

Comments

  • risraelsen
    risraelsen almost 4 years

    I have hundreds of thousands of text files that I want to parse in various ways. I want to save the output to a single file without synchronization problems. I have been using multiprocessing pool to do this to save time, but I can't figure out how to combine Pool and Queue.

    The following code will save the infile name as well as the maximum number of consecutive "x"s in the file. However, I want all processes to save results to the same file, and not to different files as in my example. Any help on this would be greatly appreciated.

    import multiprocessing
    
    with open('infilenamess.txt') as f:
        filenames = f.read().splitlines()
    
    def mp_worker(filename):
     with open(filename, 'r') as f:
          text=f.read()
          m=re.findall("x+", text)
          count=len(max(m, key=len))
          outfile=open(filename+'_results.txt', 'a')
          outfile.write(str(filename)+'|'+str(count)+'\n')
          outfile.close()
    
    def mp_handler():
        p = multiprocessing.Pool(32)
        p.map(mp_worker, filenames)
    
    if __name__ == '__main__':
        mp_handler()
    
  • risraelsen
    risraelsen over 9 years
    So, I loop through the results one at a time and write them to the file as they come in? Does that mean that the new worker won't start until each "result" has been written, or will 32 run at a time, but will wait to write? Also, can you explain why you replaced my f.read().splitlines() with [line for line in (l.strip() for l in f) if line]?
  • tdelaney
    tdelaney over 9 years
    The 32 processes run in the background and get more filenames in "chunks" as they pass results back to the parent process. Results are passed back immediately so the parent is doing its work in parallel. Its a bit more efficient to read the file line by line than to read the whole thing and split it later... that's what the list is for.
  • Menezes Sousa
    Menezes Sousa about 9 years
    Excellent! There are tons of answers about this online but none so simple. Kudos to you!
  • Reihan_amn
    Reihan_amn almost 6 years
    Thanks a lot! This is not completely clear for me that when and where the pool is created and when/where it gets drained to the file. my whole code is this: Parallel(n_jobs=num_cores)(delayed(my_function)(entry) for entry in contents) it takes one line and returns 30 lines. Do I have to store the result of all process into one list and write into the file? if yes, it is gonna be so slow because when we eventually store our results into the list and it grows and the process becomes so slow!
  • Reihan_amn
    Reihan_amn almost 6 years
    Thanks a lot! This is not completely clear for me that when and where the pool is created and when/where it gets drained to the file. my whole code is this: Parallel(n_jobs=num_cores)(delayed(my_function)(entry) for entry in contents) it takes one line and returns 30 lines. Do I have to store the result of all process into one list and write into the file? if yes, it is gonna be so slow because when we eventually store our results into the list and it grows and the process becomes so slow!
  • zyxue
    zyxue almost 6 years
    If the order of the returned results doesn't map, you could also use imap_unordered.
  • Jason Goal
    Jason Goal about 5 years
    This is a great answer. Saved lots of time, thanks. I encountered a problem while applying this to my data. I have a 5GB csv, I use 42 cores to process it and write the result to a single csv file, however, there always a messed at row 147, I can replicate this using different sub data sets, but same error always happens at that position.real output cad1d1eaf41c40f89a0198c3be80379f,2018-07-6195d4a2c0914f43814‌​42f08797f658f,2018-0‌​6-15 01:47:34,1, desired output cad1d1eaf41c40f89a0198c3be80379f,2018-07-30 01:47:34(\n)16195d4a2c0914f4381442f08797f658f,2018-06-15 01:47:34,1