Airflow : Passing a dynamic value to Sub DAG operator

11,168

I've done it with Option 3. The key is to return a valid dag with no tasks, if the file does not exist. So load_config will generate a file with your number of tasks or more information if needed. Your subdag factory would look something like:

def subdag(...):
    sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1))
    file_path = "/path/to/generated/file"
    if os.path.exists(file_path):
        data_file = open(file_path)
        list_tasks = data_file.readlines()
        for task in list_tasks:
            DummyOperator(
                  task_id='task_'+task,
                  default_args=args,
                  dag=sdag,
            )
    return sdag

At dag generation you will see a subdag with No tasks. At dag execution, after load_config is done, you can see you dynamically generated subdag

Share:
11,168
Maneesh Sharma
Author by

Maneesh Sharma

Updated on June 05, 2022

Comments

  • Maneesh Sharma
    Maneesh Sharma almost 2 years

    I am new to Airflow.
    I have come across a scenario, where Parent DAG need to pass some dynamic number (let's say n) to Sub DAG.
    Where as SubDAG will use this number to dynamically create n parallel tasks.

    Airflow documentation doesn't cover a way to achieve this. So I have explore couple of ways :

    Option - 1(Using xcom Pull)

    I have tried to pass as a xcom value, but for some reason SubDAG is not resolving to the passed value.

    Parent Dag File

    def load_dag(**kwargs):
        number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
        dag_data = json.dumps({
            "number_of_runs": number_of_runs
        })
        return dag_data
    
    # ------------------ Tasks ------------------------------
    load_config = PythonOperator(
        task_id='load_config',
        provide_context=True,
        python_callable=load_dag,
        dag=dag)
    
    
    t1 = SubDagOperator(
        task_id=CHILD_DAG_NAME,
        subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
        default_args=default_args,
        dag=dag,
    )
    

    Sub Dag File

    def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
        dag_subdag = DAG(
            dag_id='%s.%s' % (parent_dag_name, child_dag_name),
            default_args=args,
            schedule_interval=None)
    
        variabe_names = {}
    
        for i in range(num_of_runs):
            variabe_names['task' + str(i + 1)] =  DummyOperator(
            task_id='dummy_task',
            dag=dag_subdag,
        )
    
        return dag_subdag
    

    Option - 2

    I have also tried to pass number_of_runs as a global variable, which was not working.

    Option - 3

    Also we tried to write this value to a data file. But sub DAG is throwing File doesn't exist error. This might be because we are dynamically generating this file.

    Can some one help me with this.

  • ybendana
    ybendana over 5 years
    This doesn't work for me because the Jinja template isn't evaluated when calling sub_dag().
  • tom10271
    tom10271 almost 2 years
    Jinja2 template string was not rendered because Airflow will only render if fields used are registered as template fields in Operator. airflow.apache.org/docs/apache-airflow/stable/concepts/… You can also use Jinja templating with nested fields, as long as these nested fields are marked as templated in the structure they belong to: fields registered in template_fields property will be submitted to template substitution