Multiprocessing writing to pandas dataframe

12,192

Updating DataFrames like this in MultiProcessing isn't going to work:

dataf = dataf.append(new_row,ignore_index=True)

For one thing this is very inefficient (O(n) for each append so O(n^2) in total. The preferred way is to concat some objects together in one pass.

For another, and more importantly, dataf is not locking for each update, so there's no guarantee that two operations won't conflict (I'm guessing this is crashing python).

Finally, append doesn't act in place, so the variable dataf is discarded once the callback is finished!! and no changes are made to the parent dataf.


We could use MultiProcessing list or a dict. list if you don't care about order or dict if you do (e.g. enumerate), as you must note that the values are returned not in a well-defined order from async.
(or we could create an object which implements Lock ourselves, see Eli Bendersky.)
So the following changes are made:

df = pd.DataFrame(existing_data,columns=cols)
# becomes
df = pd.DataFrame(existing_data,columns=cols)
d = MultiProcessing.list([df])

dataf = dataf.append(new_row,ignore_index=True)
# becomes
d.append(new_row)

Now, once the async has finished you have a MultiProcessing.list of DataFrames. You can concat these (and ignore_index) to get the desired result:

pd.concat(d, ignore_index=True)

Should do the trick.


Note: creating the newrow DataFrame at each stage is also less efficient that letting pandas parse the list of lists directly to a DataFrame in one go. Hopefully this is a toy example, really you want your chunks to be quite large to get wins with MultiProcessing (I've heard 50kb as a rule-of-thumb...), a row at a time is never going to be a win here.


Aside: You should avoid using globals (like df) in your code, it's much cleaner to pass them around in your functions (in this case, as an argument to checker).

Share:
12,192
user3374113
Author by

user3374113

Updated on July 24, 2022

Comments

  • user3374113
    user3374113 almost 2 years

    So what I am trying to do with the following code is to read a list of lists and put them through function called checker and then have log_result deal with the result of the function checker. I am trying to do this using multithreading because the variable name rows_to_parse in reality has millions of rows, so using multiple cores should speed up this process by a considerable amount.

    The code at present moment doesn't work and crashes Python.

    Concerns and Issues I have:

    1. Want the existing df which held in the variable df to maintain the index throughout process because otherwise log_result will get confused as to which row needs updating.
    2. I am quite certain that apply_async is not the appropriate multiprocessing function to perform this duty because I believe the order at which the computer reads and writes the df can possibly corrupt it???
    3. I think that a queue may need to be set up to write and read df but I am unsure as to how I would go about doing that.

    Thank you for any assistance.

    import pandas as pd
    import multiprocessing
    from functools import partial
    
    def checker(a,b,c,d,e):
        match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
        index_of_match = match.index.tolist()
        if len(index_of_match) == 1: #one match in df
            return index_of_match
        elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
            return [index_of_match[0]]
        else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
            return [a,b,c,d,e]
    
    
    
    def log_result(result, dataf):
        if len(result) == 1: #
            dataf.loc[result[0]]['e'] += 1 
        else: #append new row to exisiting df
            new_row = pd.DataFrame([result],columns=cols)
            dataf = dataf.append(new_row,ignore_index=True)
    
    
    def apply_async_with_callback(parsing_material, dfr):
        pool = multiprocessing.Pool()
        for var_a, var_b, var_c, var_d, var_e in parsing_material:
            pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
        pool.close()
        pool.join()
    
    
    
    if __name__ == '__main__':
        #setting up main dataframe
        cols = ['a','b','c','d','e']
        existing_data = [["YES","A","16052011","13031999",3],
                        ["NO","Q","11022003","15081999",3],
                        ["YES","A","22082010","03012001",9]]
    
        #main dataframe
        df = pd.DataFrame(existing_data,columns=cols)
    
        #new data
        rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
                        ['YES', 'W', '17061992', '26032012', 6],
                        ['YES', 'G', '01122006', '07082014', 2],
                        ['YES', 'N', '06081992', '21052008', 9],
                        ['YES', 'Y', '18051995', '24011996', 6],
                        ['NO', 'Q', '11022003', '15081999', 3],
                        ['NO', 'O', '20112004', '28062008', 0],
                        ['YES', 'R', '10071994', '03091996', 8],
                        ['NO', 'C', '09091998', '22051992', 1],
                        ['YES', 'Q', '01051995', '02012000', 3],
                        ['YES', 'Q', '26022015', '26092007', 5],
                        ['NO', 'F', '15072002', '17062001', 8],
                        ['YES', 'I', '24092006', '03112003', 2],
                        ['YES', 'A', '22082010', '03012001', 9],
                        ['YES', 'I', '15072016', '30092005', 7],
                        ['YES', 'Y', '08111999', '02022006', 3],
                        ['NO', 'V', '04012016', '10061996', 1],
                        ['NO', 'I', '21012003', '11022001', 6],
                        ['NO', 'P', '06041992', '30111993', 6],
                        ['NO', 'W', '30081992', '02012016', 6]]
    
    
        apply_async_with_callback(rows_to_parse, df)
    
  • BallpointBen
    BallpointBen almost 6 years
    If you don't reassign the DataFrame and never write to the same location twice, is it safe to write to it concurrently from multiple threads?
  • Andy Hayden
    Andy Hayden almost 6 years
    That should be fine on the MultiProcessing.list/dict, it handles the locking, so will be safe.