Dask: How would I parallelize my code with dask delayed?

17,549

Solution 1

You need to call dask.compute to eventually compute the result. See dask.delayed documentation.

Sequential code

import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

results = []
for count, name in enumerate(filenames):
    file1 = pd.read_csv(name)
    df = pd.DataFrame(file1)  # isn't this already a dataframe?
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = mse(observed, prediction)  
    results.append(mean_squared_error)

Parallel code

import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

delayed_results = []
for count, name in enumerate(filenames):
    df = dask.delayed(pd.read_csv)(name)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = dask.delayed(mse)(observed, prediction)
    delayed_results.append(mean_squared_error)

results = dask.compute(*delayed_results)

Solution 2

A much clearer solution, IMO, than the accepted answer is this snippet.

from dask import compute, delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

def compute_mse(file_name):
    df = pd.read_csv(file_name)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    return mse(observed, prediction)

delayed_results = [delayed(compute_mse)(file_name) for file_name in filenames]
mean_squared_errors = compute(*delayed_results, scheduler="processes")
Share:
17,549

Related videos on Youtube

Monty
Author by

Monty

Engineer

Updated on June 24, 2022

Comments

  • Monty
    Monty about 2 years

    This is my first venture into parallel processing and I have been looking into Dask but I am having trouble actually coding it.

    I have had a look at their examples and documentation and I think dask.delayed will work best. I attempted to wrap my functions with the delayed(function_name), or add an @delayed decorator, but I can't seem to get it working properly. I preferred Dask over other methods since it is made in python and for its (supposed) simplicity. I know dask doesn't work on the for loop, but they say it can work inside a loop.

    My code passes files through a function that contains inputs to other functions and looks like this:

    from dask import delayed
    filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
    for count, name in enumerate(filenames)"
        name = name.split('.')[0]
        ....
    

    then do some pre-processing ex:

        preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
    

    then I call a constructor and pass the pre_results in to the function calls:

        fc = FunctionCalls()
        Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
                                 input_data=pre_result1, model1=pre_result2)
    

    What i do here is I pass the file into the for loop, do some pre-processing and then pass the file into two models.

    Thoughts or tips on how to do parallelize this? I began getting odd errors and I had no idea how to fix the code. The code does work as is. I use a bunch of pandas dataframes, series, and numpy arrays, and I would prefer not to go back and change everything to work with dask.dataframes etc.

    The code in my comment may be difficult to read. Here it is in a more formatted way.

    In the code below, when I type print(mean_squared_error) I just get: Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')

    from dask import delayed
    import pandas as pd
    from sklearn.metrics import mean_squared_error as mse
    filenames = ['file1.csv']
    
    for count, name in enumerate(filenames):
        file1 = pd.read_csv(name)
        df = pd.DataFrame(file1)
        prediction = df['Close'][:-1]
        observed = df['Close'][1:]
        mean_squared_error = delayed(mse)(observed, prediction)
    
    • MRocklin
      MRocklin over 7 years
      You might get a better response if you're able to produce an MCVE
    • Monty
      Monty over 7 years
      Thanks. I have removed some of the code to highlight the issue more. If anything is not clear, please let me know.
    • MRocklin
      MRocklin over 7 years
      Ideally you should show a minimal failing example that someone else could reproduce. Your current question says "I'm trying something like this and things don't work". A better question might say "I do exactly these few steps, which are complex enough to show the problem, but also simple enough that you can easily copy paste and that you can understand quickly without reading a lot of code, and I get exactly the following error."
    • Monty
      Monty over 7 years
      It's really not useful to boil my code down further. I've already edited to make it simpler to understand. I've tried to boil it down further and it doesn't seem to get to the crux of my problem. I can refer you to the github if you'd like. At the moment, I get a Type error in the read in files line. TypeError: Delayed objects of unspecified length are not iterable
    • Monty
      Monty over 7 years
      I suppose here is an example code (though not really a helpful example for answering my question....) from dask import delayed import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = ['file1.csv'] for count, name in enumerate(filenames): file1 = pd.read_csv(name) df = pd.DataFrame(file1) prediction = df['Close'][:-1] # second vec is the true values to compare observed = df['Close'][1:] mean_squared_error = delayed(mse)(observed, prediction)
  • B_Miner
    B_Miner almost 6 years
    HI @MRocklin what does *delayed_results in the call to compute do?
  • Asif Ali
    Asif Ali almost 6 years
    @B_Miner I think we are passing the address of the array, so it is going to calculate all the elements post that address of the array until it is exhausted.
  • BND
    BND about 5 years
    @B_Miner Dask makes a list of the task to do. When you call compute, then it does them, in parallel.