Processing single file from multiple processes

59,438

Solution 1

What you are looking for is a Producer/Consumer pattern

Basic threading example

Here is a basic example using the threading module (instead of multiprocessing)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

You wouldn't share the file object with the threads. You would produce work for them by supplying the queue with lines of data. Then each thread would pick up a line, process it, and then return it in the queue.

There are some more advanced facilities built into the multiprocessing module to share data, like lists and special kind of Queue. There are trade-offs to using multiprocessing vs threads and it depends on whether your work is cpu bound or IO bound.

Basic multiprocessing.Pool example

Here is a really basic example of a multiprocessing Pool

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

A Pool is a convenience object that manages its own processes. Since an open file can iterate over its lines, you can pass it to the pool.map(), which will loop over it and deliver lines to the worker function. Map blocks and returns the entire result when its done. Be aware that this is an overly simplified example, and that the pool.map() is going to read your entire file into memory all at once before dishing out work. If you expect to have large files, keep this in mind. There are more advanced ways to design a producer/consumer setup.

Manual "pool" with limit and line re-sorting

This is a manual example of the Pool.map, but instead of consuming an entire iterable in one go, you can set a queue size so that you are only feeding it piece by piece as fast as it can process. I also added the line numbers so that you can track them and refer to them if you want, later on.

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)

Solution 2

Here's a really stupid example that I cooked up:

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.


with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else: 
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

The tricky part here is to make sure that we split the file on newline characters so that you don't miss any lines (or only read partial lines). Then, each process reads it's part of the file and returns an object which can be put into the database by the main thread. Of course, you may even need to do this part in chunks so that you don't have to keep all of the information in memory at once. (this is quite easily accomplished -- just split the "args" list into X chunks and call pool.map(wrapper,chunk) -- See here)

Share:
59,438
pranavk
Author by

pranavk

An enthusiastic and learning hacker. Beautiful is better than ugly. Explicit is better than implicit. Simple is better than complex. Complex is better than complicated. Flat is better than nested. Sparse is better than dense. Readability counts.

Updated on November 27, 2020

Comments

  • pranavk
    pranavk over 3 years

    I have a single big text file in which I want to process each line ( do some operations ) and store them in a database. Since a single simple program is taking too long, I want it to be done via multiple processes or threads. Each thread/process should read the DIFFERENT data(different lines) from that single file and do some operations on their piece of data(lines) and put them in the database so that in the end, I have whole of the data processed and my database is dumped with the data I need.

    But I am not able to figure it out that how to approach this.

  • pranavk
    pranavk almost 12 years
    yep, the file is larger, about 1 GB or so. I don't know how much larger you mean by saying larger, 1 GB is larger for me though.
  • jdi
    jdi almost 12 years
    Thats fine. Im sure you can take these examples and extrapolate for your needs. The threading one is fine as it is. The multiprocessing one just needs a similar queue for you to feed.
  • user1277476
    user1277476 almost 12 years
    This is good, but what if the processing is I/O bound? In that case, parallelism may slow things down rather than speeding it up. Seeks within a single disk track are much faster than intertrack seeks, and doing I/O in parallel tends to introduce intertrack seeks in what would otherwise be a sequential I/O load. To get some benefit from parallel I/O, sometimes it helps quite a bit to use a RAID mirror.
  • jdi
    jdi almost 12 years
    @user1277476: My examples don't really deal with the speed of reading the file. Its reading the exact same way. My examples address the work of processing each line. When I say cpu or io bound, I am referring to what is happening inside the work. The file is not being read any faster. The concept here is that instead of processing each line as you read from the file, you feed the line into a queue and let multiple workers handle it.
  • jdi
    jdi almost 12 years
    @pranavk: The file size just means you will need enough memory to read it all in. Otherwise you just need to feed it line by line to a queue instead of using the map
  • jdi
    jdi almost 12 years
    @pranavk: I just added a 3rd example showing how to not pre-consume the entire file, and deliver as fast as the workers can consume.
  • KobeJohn
    KobeJohn almost 12 years
    @jdi Wow nice example you made. That may be one of the better ones available on the web now :)
  • jdi
    jdi almost 12 years
    @kobejohn: Thanks! Which one? That 3rd one? I just updated it once more to sort the lines back again
  • KobeJohn
    KobeJohn almost 12 years
    @jdi all of them together. Good introduction by example. Maybe links to the docs would be nice icing on the cake.
  • mgilson
    mgilson almost 12 years
    Agreed. This answer is much better than mine.
  • Hooked
    Hooked almost 12 years
    @jdi In "Manual "pool" with limit and line re-sorting", what if you have a string of inputs from the iterator that completes really really fast. It looks like (and from testing it, it seems to happen) that the workers return and thus the Process object completes and closes before the new input has been added to the queue. In this case the program will hang because there are no more workers left!
  • jdi
    jdi almost 12 years
    @Hooked: I am not so sure that is right, and I am not sure how you were testing it. But, it should not matter how little of input you have. There are always sentinel values (None in this case) added to the end of the data to signal the workers to exit. They should only exit when they receive None values. Otherwise they sit and wait for data from the queue. Can you please link me to a pastebin illustrating the problem? I switched my example to use f = ['a','b','c','d','e'], and also commented out the sleep in the worker. No issues.
  • jwillis0720
    jwillis0720 over 9 years
    awesome examples. I'm wondering if you can help me a little bit with ` iters = itertools.chain(f, (None,)*num_workers)` I don't quite understand what this line is doing.
  • jdi
    jdi over 9 years
    @jwillis0720 - Sure. (None,) * num_workers creates a tuple of None values equal to the size of the number of workers. These are going to be the sentinel values that tell each thread to quit because there is no more work. The itertools.chain function let's you put multiple sequences together into one virtual sequence without having to copy anything. So what we get is that first it loops over the lines in the file, and then the None values.
  • lycuid
    lycuid over 7 years
    That's better explained than my professor, very nice +1.
  • DRPK
    DRPK over 6 years
    this is not that OP wants!! but just for a idea ... not bad .
  • ℕʘʘḆḽḘ
    ℕʘʘḆḽḘ almost 6 years
    nice! but I do not understand what you say with Be aware that in overly simple example, the map is going to consume your file all at once before dishing out work.
  • jdi
    jdi almost 6 years
    @ℕʘʘḆḽḘ, I have edited my text a bit to be more clear. It now explains that the middle example is going to slurp your entire file data into memory at once, which could be a problem if you file is larger than the amount of ram you currently have available. Then I show in the 3rd example how to go line by line, so as not to consume the entire file at once.
  • ℕʘʘḆḽḘ
    ℕʘʘḆḽḘ almost 6 years
    thanks a lot, very helpful. Just one more thing. I really do not understand why the syntax pool.map(process_line, source_file, 4) makes pool operate on batches of 4 rows. I mean, what is happening here?
  • ℕʘʘḆḽḘ
    ℕʘʘḆḽḘ almost 6 years
    normally iterating using with open only iterates row by row. Am I missing something here? Thanks again!
  • jdi
    jdi almost 6 years
    @ℕʘʘḆḽḘ read the docs for pool.Map(). It says it will split the iterable up into chunks and submit them to the workers. So it will end up consuming all the lines into memory. Yes iterating one line at a time is memory efficient, but if you end up keeping all those lines in memory then you are back to reading the whole file.
  • R_Moose
    R_Moose over 3 years
    The Manual Implementation when run for large text files leads to EOF error. Any solution for it ?
  • jdi
    jdi over 3 years
    @R_Moose hard to be sure without a reproduction example. I am guessing the EOF happens when the main loop is iterating the lines in the file and it gets to the end? Are you using a file that is actively been written to? How big does the file need to be? Can you just catch the EOF exception and consider it done so the code can move on to the loop that joins and waits for the workers?
  • R_Moose
    R_Moose over 3 years
    @jdi I am using a file that is 7 GB. I am not writing to the file, I am just reading from it, changing line and then saving output in the list. Exactly like what is being done here. Refer to this: stackoverflow.com/questions/25994201/… as to why EOF is happening. You are right, I couldnt reproduce it for smaller files or even for larger files it doesn't happen all the time, it is pretty sporadic. I would say 4/10 runs for a file of & GBs
  • jdi
    jdi over 3 years
    @R_Moose well it is not exactly the same problem as that link because in my implementation you can see that I am sending a poison pill to every worker (a None value to each worker after the file EOF), and we join on every worker before exiting. So you should see every worker properly shut down.
  • R_Moose
    R_Moose over 3 years
    @jdi very well. This is the code I am using, exactly like your implementation. I am using a dictionary instead of a list to avoid sorting. stackoverflow.com/questions/64128330/… Any idea why it would fail on EOF ? No concerns as your code works well for file size < 5 Gigs on my machine.
  • R_Moose
    R_Moose over 3 years
    I have the exact error on my post here now: stackoverflow.com/questions/64128330/…
  • étale-cohomology
    étale-cohomology over 2 years
    But all processes are writing to the same file at the same time without a lock?