An attempt has been made to start a new process before the current process has finished its bootstrapping phase

20,206

Solution 1

The Client command starts up new processes, so it will have to be within the if __name__ == '__main__': block as described in this SO question or this GitHub issue

This is the same as with the multiprocessing module

Solution 2

I faced several issues in my code even after including main if __name__ == '__main__': .

I am using several python files and modules and multiprocess is used by only one function for some sampling operation. The only fix that worked for me is to include main at the very first file & first line of the whole code (even including imports). The following worked well:

if __name__ == '__main__':
    from mjrl.utils.gym_env import GymEnv
    from mjrl.policies.gaussian_mlp import MLP
    from mjrl.baselines.quadratic_baseline import QuadraticBaseline
    from mjrl.baselines.mlp_baseline import MLPBaseline
    from mjrl.algos.npg_cg import NPG
    from mjrl.algos.dapg import DAPG
    from mjrl.algos.behavior_cloning import BC
    from mjrl.utils.train_agent import train_agent
    from mjrl.samplers.core import sample_paths
    import os
    import json
    import mjrl.envs
    import mj_envs
    import time as timer
    import pickle
    import argparse

    import numpy as np 

    # ===============================================================================
    # Get command line arguments
    # ===============================================================================

    parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
    parser.add_argument('--output', type=str, required=True, help='location to store results')
    parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
    args = parser.parse_args()
    JOB_DIR = args.output
    if not os.path.exists(JOB_DIR):
        os.mkdir(JOB_DIR)
    with open(args.config, 'r') as f:
        job_data = eval(f.read())
    assert 'algorithm' in job_data.keys()
    assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
    job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
    job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
    EXP_FILE = JOB_DIR + '/job_config.json'
    with open(EXP_FILE, 'w') as f:
        json.dump(job_data, f, indent=4)

    # ===============================================================================
    # Train Loop
    # ===============================================================================

    e = GymEnv(job_data['env'])
    policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
    baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
                           epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])
Share:
20,206

Related videos on Youtube

muammar
Author by

muammar

Updated on July 09, 2022

Comments

  • muammar
    muammar almost 2 years

    I am new to dask and I found so nice to have a module that makes it easy to get parallelization. I am working on a project where I was able to parallelize in a single machine a loop as you can see here . However, I would like to move over to dask.distributed. I applied the following changes to the class above:

    diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
    index ce6a72b..89f8638 100644
    --- a/mlchem/fingerprints/gaussian.py
    +++ b/mlchem/fingerprints/gaussian.py
    @@ -6,7 +6,7 @@ from sklearn.externals import joblib
     from .cutoff import Cosine
     from collections import OrderedDict
     import dask
    -import dask.multiprocessing
    +from dask.distributed import Client
     import time
    
    
    @@ -141,13 +141,14 @@ class Gaussian(object):
             for image in images.items():
                 computations.append(self.fingerprints_per_image(image))
    
    +        client = Client()
             if self.scaler is None:
    -            feature_space = dask.compute(*computations, scheduler='processes',
    +            feature_space = dask.compute(*computations, scheduler='distributed',
                                              num_workers=self.cores)
                 feature_space = OrderedDict(feature_space)
             else:
                 stacked_features = dask.compute(*computations,
    -                                            scheduler='processes',
    +                                            scheduler='distributed',
                                                 num_workers=self.cores)
    
                 stacked_features = numpy.array(stacked_features)
    

    Doing so generates this error:

     File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
        is not going to be frozen to produce an executable.''')
    RuntimeError:
            An attempt has been made to start a new process before the
            current process has finished its bootstrapping phase.
    
            This probably means that you are not using fork to start your
            child processes and you have forgotten to use the proper idiom
            in the main module:
    
                if __name__ == '__main__':
                    freeze_support()
                    ...
    

    I have tried different ways of adding if __name__ == '__main__': without any success. This can be reproduced by running this example. I would appreciate if anyone could help me to figure this out. I have no clue on how I should change my code to make it work.

    Thanks.

    Edit: The example is cu_training.py.

  • muammar
    muammar over 5 years
    Thanks, @MRocklin. I had read the links you have sent in your answer. However, I have not found a way yet to change my code to make this work.
  • muammar
    muammar over 5 years
    I finally understood you, @MRocklin. I fixed it here github.com/muammar/mlchem/commit/… I will try to refactor my code because I don't like very much that I have to do a function for running the calculations, but maybe this is just the intended way on using distributed. Not sure yet. Great tool by the way.