Processing single file from multiple processes
Solution 1
What you are looking for is a Producer/Consumer pattern
Basic threading example
Here is a basic example using the threading module (instead of multiprocessing)
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
You wouldn't share the file object with the threads. You would produce work for them by supplying the queue with lines of data. Then each thread would pick up a line, process it, and then return it in the queue.
There are some more advanced facilities built into the multiprocessing module to share data, like lists and special kind of Queue. There are trade-offs to using multiprocessing vs threads and it depends on whether your work is cpu bound or IO bound.
Basic multiprocessing.Pool example
Here is a really basic example of a multiprocessing Pool
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
A Pool is a convenience object that manages its own processes. Since an open file can iterate over its lines, you can pass it to the pool.map()
, which will loop over it and deliver lines to the worker function. Map blocks and returns the entire result when its done. Be aware that this is an overly simplified example, and that the pool.map()
is going to read your entire file into memory all at once before dishing out work. If you expect to have large files, keep this in mind. There are more advanced ways to design a producer/consumer setup.
Manual "pool" with limit and line re-sorting
This is a manual example of the Pool.map, but instead of consuming an entire iterable in one go, you can set a queue size so that you are only feeding it piece by piece as fast as it can process. I also added the line numbers so that you can track them and refer to them if you want, later on.
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)
Solution 2
Here's a really stupid example that I cooked up:
import os.path
import multiprocessing
def newlinebefore(f,n):
f.seek(n)
c=f.read(1)
while c!='\n' and n > 0:
n-=1
f.seek(n)
c=f.read(1)
f.seek(n)
return n
filename='gpdata.dat' #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)
#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)
#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.
with open(filename,'r') as f:
start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))
end_byte=[i-1 for i in start_byte] [1:] + [None]
def process_piece(filename,start,end):
with open(filename,'r') as f:
f.seek(start+1)
if(end is None):
text=f.read()
else:
nbytes=end-start+1
text=f.read(nbytes)
# process text here. createing some object to be returned
# You could wrap text into a StringIO object if you want to be able to
# read from it the way you would a file.
returnobj=text
return returnobj
def wrapper(args):
return process_piece(*args)
filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)
pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)
#Now take your results and write them to the database.
print "".join(result) #I just print it to make sure I get my file back ...
The tricky part here is to make sure that we split the file on newline characters so that you don't miss any lines (or only read partial lines). Then, each process reads it's part of the file and returns an object which can be put into the database by the main thread. Of course, you may even need to do this part in chunks so that you don't have to keep all of the information in memory at once. (this is quite easily accomplished -- just split the "args" list into X chunks and call pool.map(wrapper,chunk)
-- See here)
pranavk
An enthusiastic and learning hacker. Beautiful is better than ugly. Explicit is better than implicit. Simple is better than complex. Complex is better than complicated. Flat is better than nested. Sparse is better than dense. Readability counts.
Updated on November 27, 2020Comments
-
pranavk over 3 years
I have a single big text file in which I want to process each line ( do some operations ) and store them in a database. Since a single simple program is taking too long, I want it to be done via multiple processes or threads. Each thread/process should read the DIFFERENT data(different lines) from that single file and do some operations on their piece of data(lines) and put them in the database so that in the end, I have whole of the data processed and my database is dumped with the data I need.
But I am not able to figure it out that how to approach this.
-
pranavk almost 12 yearsyep, the file is larger, about 1 GB or so. I don't know how much larger you mean by saying larger, 1 GB is larger for me though.
-
jdi almost 12 yearsThats fine. Im sure you can take these examples and extrapolate for your needs. The threading one is fine as it is. The multiprocessing one just needs a similar queue for you to feed.
-
user1277476 almost 12 yearsThis is good, but what if the processing is I/O bound? In that case, parallelism may slow things down rather than speeding it up. Seeks within a single disk track are much faster than intertrack seeks, and doing I/O in parallel tends to introduce intertrack seeks in what would otherwise be a sequential I/O load. To get some benefit from parallel I/O, sometimes it helps quite a bit to use a RAID mirror.
-
jdi almost 12 years@user1277476: My examples don't really deal with the speed of reading the file. Its reading the exact same way. My examples address the work of processing each line. When I say cpu or io bound, I am referring to what is happening inside the work. The file is not being read any faster. The concept here is that instead of processing each line as you read from the file, you feed the line into a queue and let multiple workers handle it.
-
jdi almost 12 years@pranavk: The file size just means you will need enough memory to read it all in. Otherwise you just need to feed it line by line to a queue instead of using the
map
-
jdi almost 12 years@pranavk: I just added a 3rd example showing how to not pre-consume the entire file, and deliver as fast as the workers can consume.
-
KobeJohn almost 12 years@jdi Wow nice example you made. That may be one of the better ones available on the web now :)
-
jdi almost 12 years@kobejohn: Thanks! Which one? That 3rd one? I just updated it once more to sort the lines back again
-
KobeJohn almost 12 years@jdi all of them together. Good introduction by example. Maybe links to the docs would be nice icing on the cake.
-
mgilson almost 12 yearsAgreed. This answer is much better than mine.
-
Hooked almost 12 years@jdi In "Manual "pool" with limit and line re-sorting", what if you have a string of inputs from the iterator that completes really really fast. It looks like (and from testing it, it seems to happen) that the workers return and thus the
Process
object completes and closes before the new input has been added to the queue. In this case the program will hang because there are no more workers left! -
jdi almost 12 years@Hooked: I am not so sure that is right, and I am not sure how you were testing it. But, it should not matter how little of input you have. There are always sentinel values (None in this case) added to the end of the data to signal the workers to exit. They should only exit when they receive None values. Otherwise they sit and wait for data from the queue. Can you please link me to a pastebin illustrating the problem? I switched my example to use
f = ['a','b','c','d','e']
, and also commented out the sleep in the worker. No issues. -
jwillis0720 over 9 yearsawesome examples. I'm wondering if you can help me a little bit with ` iters = itertools.chain(f, (None,)*num_workers)` I don't quite understand what this line is doing.
-
jdi over 9 years@jwillis0720 - Sure.
(None,) * num_workers
creates a tuple of None values equal to the size of the number of workers. These are going to be the sentinel values that tell each thread to quit because there is no more work. Theitertools.chain
function let's you put multiple sequences together into one virtual sequence without having to copy anything. So what we get is that first it loops over the lines in the file, and then the None values. -
lycuid over 7 yearsThat's better explained than my professor, very nice +1.
-
DRPK over 6 yearsthis is not that OP wants!! but just for a idea ... not bad .
-
ℕʘʘḆḽḘ almost 6 yearsnice! but I do not understand what you say with
Be aware that in overly simple example, the map is going to consume your file all at once before dishing out work
. -
jdi almost 6 years@ℕʘʘḆḽḘ, I have edited my text a bit to be more clear. It now explains that the middle example is going to slurp your entire file data into memory at once, which could be a problem if you file is larger than the amount of ram you currently have available. Then I show in the 3rd example how to go line by line, so as not to consume the entire file at once.
-
ℕʘʘḆḽḘ almost 6 yearsthanks a lot, very helpful. Just one more thing. I really do not understand why the syntax
pool.map(process_line, source_file, 4)
makespool
operate on batches of 4 rows. I mean, what is happening here? -
ℕʘʘḆḽḘ almost 6 yearsnormally iterating using
with open
only iterates row by row. Am I missing something here? Thanks again! -
jdi almost 6 years@ℕʘʘḆḽḘ read the docs for pool.Map(). It says it will split the iterable up into chunks and submit them to the workers. So it will end up consuming all the lines into memory. Yes iterating one line at a time is memory efficient, but if you end up keeping all those lines in memory then you are back to reading the whole file.
-
R_Moose over 3 yearsThe Manual Implementation when run for large text files leads to EOF error. Any solution for it ?
-
jdi over 3 years@R_Moose hard to be sure without a reproduction example. I am guessing the EOF happens when the main loop is iterating the lines in the file and it gets to the end? Are you using a file that is actively been written to? How big does the file need to be? Can you just catch the EOF exception and consider it done so the code can move on to the loop that joins and waits for the workers?
-
R_Moose over 3 years@jdi I am using a file that is 7 GB. I am not writing to the file, I am just reading from it, changing line and then saving output in the list. Exactly like what is being done here. Refer to this: stackoverflow.com/questions/25994201/… as to why EOF is happening. You are right, I couldnt reproduce it for smaller files or even for larger files it doesn't happen all the time, it is pretty sporadic. I would say 4/10 runs for a file of & GBs
-
jdi over 3 years@R_Moose well it is not exactly the same problem as that link because in my implementation you can see that I am sending a poison pill to every worker (a
None
value to each worker after the file EOF), and we join on every worker before exiting. So you should see every worker properly shut down. -
R_Moose over 3 years@jdi very well. This is the code I am using, exactly like your implementation. I am using a dictionary instead of a list to avoid sorting. stackoverflow.com/questions/64128330/… Any idea why it would fail on EOF ? No concerns as your code works well for file size < 5 Gigs on my machine.
-
R_Moose over 3 yearsI have the exact error on my post here now: stackoverflow.com/questions/64128330/…
-
étale-cohomology over 2 yearsBut all processes are writing to the same file at the same time without a lock?