Easiest way to read csv files with multiprocessing in Pandas

25,080

Solution 1

Using Pool:

import os
import pandas as pd 
from multiprocessing import Pool

# wrap your csv importer in a function that can be mapped
def read_csv(filename):
    'converts a filename to a pandas dataframe'
    return pd.read_csv(filename)


def main():

    # get a list of file names
    files = os.listdir('.')
    file_list = [filename for filename in files if filename.split('.')[1]=='csv']

    # set up your pool
    with Pool(processes=8) as pool: # or whatever your hardware can support

        # have your pool map the file names to dataframes
        df_list = pool.map(read_csv, file_list)

        # reduce the list of dataframes to a single dataframe
        combined_df = pd.concat(df_list, ignore_index=True)

if __name__ == '__main__':
    main()

Solution 2

dask library is designed to address not only but certainly your issue.

Solution 3

I am not getting map/map_async to work, but managed to work with apply_async.

Two possible ways (I have no idea which one is better):

  • A) Concat at the end
  • B) Concat during

I find glob easy to list and fitler files from a directory

from glob import glob
import pandas as pd
from multiprocessing import Pool

folder = "./task_1/" # note the "/" at the end
file_list = glob(folder+'*.xlsx')

def my_read(filename):
    f = pd.read_csv(filename)
    return (f.VALUE.as_matrix()).reshape(75,90)

#DF_LIST = [] # A) end
DF = pd.DataFrame() # B) during

def DF_LIST_append(result):
    #DF_LIST.append(result) # A) end
    global DF # B) during
    DF = pd.concat([DF,result], ignore_index=True) # B) during

pool = Pool(processes=8)

for file in file_list:
    pool.apply_async(my_read, args = (file,), callback = DF_LIST_append)

pool.close()
pool.join()

#DF = pd.concat(DF_LIST, ignore_index=True) # A) end

print(DF.shape)

Solution 4

If you aren't against using another library, you could use Graphlab's sframe. This creates an object similar to data frames which is very fast to read data if performance is a big issue.

Share:
25,080
Han Zhengzu
Author by

Han Zhengzu

I am a researcher focusing on atmospheric environment with an interest in a deeper understanding on air pollution.

Updated on July 10, 2022

Comments

  • Han Zhengzu
    Han Zhengzu almost 2 years

    Here is my question.
    With bunch of .csv files(or other files). Pandas is an easy way to read them and save into Dataframe format. But when the amount of files was huge, I want to read the files with multiprocessing to save some time.

    My early attempt

    I manually divide the files into different path. Using severally:

    os.chdir("./task_1")
    files = os.listdir('.')
    files.sort()
    for file in files:
        filename,extname = os.path.splitext(file)
        if extname == '.csv':
            f = pd.read_csv(file)
            df = (f.VALUE.as_matrix()).reshape(75,90)   
    

    And then combine them.

    How to run them with pool to achieve my problem?
    Any advice would be appreciated!

  • seralouk
    seralouk almost 5 years
    what does df_list contains?
  • zemekeneng
    zemekeneng over 4 years
    @serafeim df_list is a list of the pd.DataFrames that are produced by the pool of processes.
  • Ipa
    Ipa almost 4 years
    Is there any case this code won't work? I tried to read excel files instead and sometimes it gets stuck.