Python multiprocessing with generator

21,688

Solution 1

As @pvg said in a comment, a (bounded) queue is the natural way to mediate among a producer and consumers with different speeds, ensuring they all stay as busy as possible but without letting the producer get way ahead.

Here's a self-contained, executable example. The queue is restricted to a maximum size equal to the number of worker processes. If the consumers run much faster than the producer, it could make good sense to let the queue get bigger than that.

In your specific case, it would probably make sense to pass lines to the consumers and let them do the document = json.loads(line) part in parallel.

import multiprocessing as mp

NCORE = 4

def process(q, iolock):
    from time import sleep
    while True:
        stuff = q.get()
        if stuff is None:
            break
        with iolock:
            print("processing", stuff)
        sleep(stuff)

if __name__ == '__main__':
    q = mp.Queue(maxsize=NCORE)
    iolock = mp.Lock()
    pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock))
    for stuff in range(20):
        q.put(stuff)  # blocks until q below its max size
        with iolock:
            print("queued", stuff)
    for _ in range(NCORE):  # tell workers we're done
        q.put(None)
    pool.close()
    pool.join()

Solution 2

So I ended up running this succesfully. By creating chunks of lines from my file and running the lines parallely. Posting it here so it can be useful to somebody in future.

def run_parallel(self, processes=4):
    processes = int(processes)
    pool = mp.Pool(processes)
    try:
        pool = mp.Pool(processes)
        jobs = []
        # run for chunks of files
        for chunkStart,chunkSize in self.chunkify(input_path):
            jobs.append(pool.apply_async(self.process_wrapper,(chunkStart,chunkSize)))
        for job in jobs:
            job.get()
        pool.close()
    except Exception as e:
        print e

def process_wrapper(self, chunkStart, chunkSize):
    with open(self.input_file) as f:
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            document = json.loads(line)
            self.process_file(document)

# Splitting data into chunks for parallel processing
def chunkify(self, filename, size=1024*1024):
    fileEnd = os.path.getsize(filename)
    with open(filename,'r') as f:
        chunkEnd = f.tell()
        while True:
            chunkStart = chunkEnd
            f.seek(size,1)
            f.readline()
            chunkEnd = f.tell()
            yield chunkStart, chunkEnd - chunkStart
            if chunkEnd > fileEnd:
                break

Solution 3

The answer of Tim Peters is great.
But my specific case was slightly different, and I had to modify his answer to fit my need. Referencing here.
This answer the question of @CpILL in the comments.


In my case, I used a chain of generator (to create a pipeline).
Among this chain of generator, one of them was doing heavy computations, slowing the whole pipeline.

Something like this :

def fast_generator1():
    for line in file:
        yield line

def slow_generator(lines):
    for line in lines:
        yield heavy_processing(line)

def fast_generator2():
    for line in lines:
        yield fast_func(line)

if __name__ == "__main__":
    lines = fast_generator1()
    lines = slow_generator(lines)
    lines = fast_generator2(lines)
    for line in lines:
        print(line)

To make it faster, we have to execute the slow generator with multiple processes.
The modified code look like :

import multiprocessing as mp

NCORE = 4

def fast_generator1():
    for line in file:
        yield line

def slow_generator(lines):
    def gen_to_queue(input_q, lines):
        # This function simply consume our generator and write it to the input queue
        for line in lines:
            input_q.put(line)
        for _ in range(NCORE):    # Once generator is consumed, send end-signal
            input_q.put(None)

    def process(input_q, output_q):
        while True:
            line = input_q.get()
            if line is None:
                output_q.put(None)
                break
            output_q.put(heavy_processing(line))


    input_q = mp.Queue(maxsize=NCORE * 2)
    output_q = mp.Queue(maxsize=NCORE * 2)

    # Here we need 3 groups of worker :
    # * One that will consume the input generator and put it into a queue. It will be `gen_pool`. It's ok to have only 1 process doing this, since this is a very light task
    # * One that do the main processing. It will be `pool`.
    # * One that read the results and yield it back, to keep it as a generator. The main thread will do it.
    gen_pool = mp.Pool(1, initializer=gen_to_queue, initargs=(input_q, lines))
    pool = mp.Pool(NCORE, initializer=process, initargs=(input_q, output_q))

    finished_workers = 0
    while True:
        line = output_q.get()
        if line is None:
            finished_workers += 1
            if finished_workers == NCORE:
                break
        else:
            yield line

def fast_generator2():
    for line in lines:
        yield fast_func(line)

if __name__ == "__main__":
    lines = fast_generator1()
    lines = slow_generator(lines)
    lines = fast_generator2(lines)
    for line in lines:
        print(line)

With this implementation, we have a multiprocess generator : it is used exactly like other generators (like in the first example of this answer), but all the heavy computation are done using multiprocessing, accelerating it !

Solution 4

Late to the party. Had a similar problem. Producers and Consumers basically. Like few have pointed out a Queue is best suited to this problem.

You can create an Executor Pool (Threaded or Process) and use it in conjuction with a semaphore to ensure n number of tasks are picked up at the same time. If anyother task is submitted by your generator, it is blocked until the semaphore counter decreases.

Found a ready made solution. Check out this Gist

Share:
21,688
Muthu Rg
Author by

Muthu Rg

Updated on December 06, 2021

Comments

  • Muthu Rg
    Muthu Rg over 2 years

    I'm trying to process a file(every line is a json document). The size of the file can go up to 100's of mbs to gb's. So I wrote a generator code to fetch each document line by line from file.

    def jl_file_iterator(file):
        with codecs.open(file, 'r', 'utf-8') as f:
            for line in f:
                document = json.loads(line)
                yield document
    

    My system has 4 cores, So I would like to process 4 lines of the file in parallel. Currently I have this code which takes 4 lines at a time and calls the code for parallel processing

    threads = 4
    files, i = [], 1
    for jl in jl_file_iterator(input_path):
        files.append(jl)
        if i % (threads) == 0:
            # pool.map(processFile, files)
            parallelProcess(files, o)
            files = []
        i += 1
    
    if files:
        parallelProcess(files, o)
        files = []
    

    This is my code where actual processing happens

    def parallelProcess(files, outfile):
        processes = []
        for i in range(len(files)):
            p = Process(target=processFile, args=(files[i],))
            processes.append(p)
            p.start()
        for i in range(len(files)):
            processes[i].join()
    
    def processFile(doc):
        extractors = {}
        ... do some processing on doc
        o.write(json.dumps(doc) + '\n')
    

    As you can see I wait for all the 4 lines to finish processing before I send the next 4 files to process. But what I would like to do is as soon as one process finish processing file I want to start the next line to be assigned to realeased processor. How do I do that?

    PS: The problem is since its an generator I cannot load all the files and use something like map to run the processes.

    Thanks for your help