Python, process a large text file in parallel

11,667

Solution 1

The simplest approach would probably be to do all 30 files at once with your existing code -- would still take all day, but you'd have all the files done at once. (ie, "9 babies in 9 months" is easy, "1 baby in 1 month" is hard)

If you really want to get a single file done faster, it will depend on how your counters actually update. If almost all the work is just in analysing value you can offload that using the multiprocessing module:

import time
import multiprocessing

def slowfunc(value):
    time.sleep(0.01)
    return value**2 + 0.3*value + 1

counter_a = counter_b = counter_c = 0
def add_to_counter(res):
    global counter_a, counter_b, counter_c
    counter_a += res
    counter_b -= (res - 10)**2
    counter_c += (int(res) % 2)

pool = multiprocessing.Pool(50)
results = []

for value in range(100000):
    r = pool.apply_async(slowfunc, [value])
    results.append(r)

    # don't let the queue grow too long
    if len(results) == 1000:
        results[0].wait()

    while results and results[0].ready():
        r = results.pop(0)
        add_to_counter(r.get())

for r in results:
    r.wait()
    add_to_counter(r.get())

print counter_a, counter_b, counter_c

That will allow 50 slowfuncs to run in parallel, so instead of taking 1000s (=100k*0.01s), it takes 20s (100k/50)*0.01s to complete. If you can restructure your function into "slowfunc" and "add_to_counter" like the above, you should be able to get a factor of 24 speedup.

Solution 2

Read one file at a time, use all CPUs to run search_function():

#!/usr/bin/env python
from multiprocessing import Array, Pool

def init(counters_): # called for each child process
    global counters
    counters = counters_

def search_function (value): # assume it is CPU-intensive task
    some conditions checking:
        update the counter_a or counter_b or counter_c
        counter[0] += 1 # counter 'a'
        counter[1] += 1 # counter 'b'
    return value, result, error

if __name__ == '__main__':
    counters = Array('i', [0]*3)
    pool = Pool(initializer=init, initargs=[counters])
    values = (line.split()[3] for line in textfile)
    for value, result, error in pool.imap_unordered(search_function, values,
                                                    chunksize=1000):
        if error is not None:
            print('value: {value}, error: {error}'.format(**vars()))
    pool.close()
    pool.join()
    print(list(counters))

Make sure (for example, by writing wrappers) that exceptions do not escape next(values), search_function().

Solution 3

This solution works on a set of files.

For each file, it divides it into a specified number of line-aligned chunks, solves each chunk in parallel, then combines the results.

It streams each chunk from disk; this is somewhat slower, but does not consume nearly so much memory. We depend on disk cache and buffered reads to prevent head thrashing.

Usage is like

python script.py -n 16 sam1.txt sam2.txt sam3.txt

and script.py is

import argparse
from io import SEEK_END 
import multiprocessing as mp

#
# Worker process
#
def summarize(fname, start, stop):
    """
    Process file[start:stop]

    start and stop both point to first char of a line (or EOF)
    """
    a = 0
    b = 0
    c = 0

    with open(fname, newline='') as inf:
        # jump to start position
        pos = start
        inf.seek(pos)

        for line in inf:
            value = int(line.split(4)[3])

            # *** START EDIT HERE ***
            #

            # update a, b, c based on value

            #
            # *** END EDIT HERE ***

            pos += len(line)
            if pos >= stop:
                break

    return a, b, c

def main(num_workers, sam_files):
    print("{} workers".format(num_workers))
    pool = mp.Pool(processes=num_workers)

    # for each input file
    for fname in sam_files:
        print("Dividing {}".format(fname))
        # decide how to divide up the file
        with open(fname) as inf:
            # get file length
            inf.seek(0, SEEK_END)
            f_len = inf.tell()
            # find break-points
            starts = [0]
            for n in range(1, num_workers):
                # jump to approximate break-point
                inf.seek(n * f_len // num_workers)
                # find start of next full line
                inf.readline()
                # store offset
                starts.append(inf.tell())

        # do it!
        stops = starts[1:] + [f_len]
        start_stops =  zip(starts, stops)
        print("Solving {}".format(fname))
        results = [pool.apply(summarize, args=(fname, start, stop)) for start,stop in start_stops]

        # collect results
        results = [sum(col) for col in zip(*results)]
        print(results)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Parallel text processor')
    parser.add_argument('--num_workers', '-n', default=8, type=int)
    parser.add_argument('sam_files', nargs='+')
    args = parser.parse_args()
    main(args.num_workers, args.sam_files)
    main(args.num_workers, args.sam_files)
Share:
11,667
Xiangwu
Author by

Xiangwu

research in computing, bioinformatics

Updated on June 09, 2022

Comments

  • Xiangwu
    Xiangwu almost 2 years

    Samples records in the data file (SAM file):

    M01383  0  chr4  66439384  255  31M  *  0  0  AAGAGGA GFAFHGD  MD:Z:31 NM:i:0
    M01382  0  chr1  241995435  255 31M  *  0  0  ATCCAAG AFHTTAG  MD:Z:31 NM:i:0
    ......
    
    • The data files are line-by-line based
    • The size of the data files are varies from 1G - 5G.

    I need to go through the record in the data file line by line, get a particular value (e.g. 4th value, 66439384) from each line, and pass this value to another function for processing. Then some results counter will be updated.

    the basic workflow is like this:

    # global variable, counters will be updated in search function according to the value passed. 
    counter_a = 0    
    counter_b = 0
    counter_c = 0
    
    open textfile:
        for line in textfile:
            value = line.split()[3]
            search_function(value)    # this function takes abit long time to process
    
    def search_function (value):
        some conditions checking:
            update the counter_a or counter_b or counter_c
    

    With single process code and about 1.5G data file, it took about 20 hours to run through all the records in one data file. I need much faster code because there are more than 30 of this kind data file.

    I was thinking to process the data file in N chunks in parallel, and each chunk will perform above workflow and update the global variable (counter_a, counter_b, counter_c) simultaneously. But I don't know how to achieve this in code, or wether this will work.

    I have access to a server machine with: 24 processors and around 40G RAM.

    Anyone could help with this? Thanks very much.