Why is reading multiple files at the same time slower than reading sequentially?

10,063

Solution 1

Looks like you're I/O bound:

In computer science, I/O bound refers to a condition in which the time it takes to complete a computation is determined principally by the period spent waiting for input/output operations to be completed. This is the opposite of a task being CPU bound. This circumstance arises when the rate at which data is requested is slower than the rate it is consumed or, in other words, more time is spent requesting data than processing it.

You probably need to have your main thread do the reading and add the data to the pool when a subprocess becomes available. This will be different to using map.

As you are processing a line at a time, and the inputs are split, you can use fileinput to iterate over lines of multiple files, and map to a function processing lines instead of files:

Passing one line at a time might be too slow, so we can ask map to pass chunks, and can adjust until we find a sweet-spot. Our function parses chunks of lines:

def _parse_coreset_points(lines):
    return Points([_parse_coreset_point(line) for line in lines])

def _parse_coreset_point(line):
    s = line.split()
    x, y = [int(v) for v in s]
    return CoresetPoint(x, y)

And our main function:

import fileinput

def getParsedFiles(directory):
    pool = Pool(2)

    txts = [filename for filename in os.listdir(directory):
            if filename.endswith(".txt")]

    return pool.imap(_parse_coreset_points, fileinput.input(txts), chunksize=100)

Solution 2

In general it is never a good idea to read from the same physical (spinning) hard disk from different threads simultaneously, because every switch causes an extra delay of around 10ms to position the read head of the hard disk (would be different on SSD).

As @peter-wood already said, it is better to have one thread reading in the data, and have other threads processing that data.

Also, to really test the difference, I think you should do the test with some bigger files. For example: current hard disks should be able to read around 100MB/sec. So reading the data of a 100kB file in one go would take 1ms, while positioning the read head to the beginning of that file would take 10ms.

On the other hand, looking at your numbers (assuming those are for a single loop) it is hard to believe that being I/O bound is the only problem here. Total data is 100MB, which should take 1 second to read from disk plus some overhead, but your program takes 130 seconds. I don't know if that number is with the files cold on disk, or an average of multiple tests where the data is already cached by the OS (with 62 GB or RAM all that data should be cached the second time) - it would be interesting to see both numbers.

So there has to be something else. Let's take a closer look at your loop:

for line in f:
    s = line.split()
    x, y = [int(v) for v in s]
    obj = CoresetPoint(x, y)
    gc.disable()
    myList.append(obj)
    gc.enable()

While I don't know Python, my guess would be that the gc calls are the problem here. They are called for every line read from disk. I don't know how expensive those calls are (or what if gc.enable() triggers a garbage collection for example) and why they would be needed around append(obj) only, but there might be other problems because this is multithreading:

Assuming the gc object is global (i.e. not thread local) you could have something like this:

thread 1 : gc.disable()
# switch to thread 2
thread 2 : gc.disable()
thread 2 : myList.append(obj)
thread 2 : gc.enable()
# gc now enabled!
# switch back to thread 1 (or one of the other threads)
thread 1 : myList.append(obj)
thread 1 : gc.enable()

And if the number of threads <= number of cores, there wouldn't even be any switching, they would all be calling this at the same time.

Also, if the gc object is thread safe (it would be worse if it isn't) it would have to do some locking in order to safely alter it's internal state, which would force all other threads to wait.

For example, gc.disable() would look something like this:

def disable()
    lock()  # all other threads are blocked for gc calls now
    alter internal data
    unlock()

And because gc.disable() and gc.enable() are called in a tight loop, this will hurt performance when using multiple threads.

So it would be better to remove those calls, or place them at the beginning and end of your program if they are really needed (or only disable gc at the beginning, no need to do gc right before quitting the program).

Depending on the way Python copies or moves objects, it might also be slightly better to use myList.append(CoresetPoint(x, y)).

So it would be interesting to test the same on one 100MB file with one thread and without the gc calls.

If the processing takes longer than the reading (i.e. not I/O bound), use one thread to read the data in a buffer (should take 1 or 2 seconds on one 100MB file if not already cached), and multiple threads to process the data (but still without those gc calls in that tight loop).

You don't have to split the data into multiple files in order to be able to use threads. Just let them process different parts of the same file (even with the 14GB file).

Share:
10,063
Tony Tannous
Author by

Tony Tannous

Sunspots have faded and now I'm doing time.

Updated on June 20, 2022

Comments

  • Tony Tannous
    Tony Tannous almost 2 years

    I am trying to parse many files found in a directory, however using multiprocessing slows my program.

    # Calling my parsing function from Client.
    L = getParsedFiles('/home/tony/Lab/slicedFiles') <--- 1000 .txt files found here.
                                                           combined ~100MB
    

    Following this example from python documentation:

    from multiprocessing import Pool
    
    def f(x):
        return x*x
    
    if __name__ == '__main__':
        p = Pool(5)
        print(p.map(f, [1, 2, 3]))
    

    I've written this piece of code:

    from multiprocessing import Pool
    from api.ttypes import *
    
    import gc
    import os
    
    def _parse(pathToFile):
        myList = []
        with open(pathToFile) as f:
            for line in f:
                s = line.split()
                x, y = [int(v) for v in s]
                obj = CoresetPoint(x, y)
                gc.disable()
                myList.append(obj)
                gc.enable()
        return Points(myList)
    
    def getParsedFiles(pathToFile):
        myList = []
        p = Pool(2)
        for filename in os.listdir(pathToFile):
            if filename.endswith(".txt"):
                myList.append(filename)
        return p.map(_pars, , myList)
    

    I followed the example, put all the names of the files that end with a .txt in a list, then created Pools, and mapped them to my function. Then I want to return a list of objects. Each object holds the parsed data of a file. However it amazes me that I got the following results:

    #Pool 32  ---> ~162(s)
    #Pool 16 ---> ~150(s)
    #Pool 12 ---> ~142(s)
    #Pool 2 ---> ~130(s)
    

    Graph:
    enter image description here

    Machine specification:

    62.8 GiB RAM
    Intel® Core™ i7-6850K CPU @ 3.60GHz × 12   
    

    What am I missing here ?
    Thanks in advance!