Airflow : Passing a dynamic value to Sub DAG operator
I've done it with Option 3. The key is to return a valid dag with no tasks, if the file does not exist. So load_config will generate a file with your number of tasks or more information if needed. Your subdag factory would look something like:
def subdag(...):
sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1))
file_path = "/path/to/generated/file"
if os.path.exists(file_path):
data_file = open(file_path)
list_tasks = data_file.readlines()
for task in list_tasks:
DummyOperator(
task_id='task_'+task,
default_args=args,
dag=sdag,
)
return sdag
At dag generation you will see a subdag with No tasks. At dag execution, after load_config is done, you can see you dynamically generated subdag
Maneesh Sharma
Updated on June 05, 2022Comments
-
Maneesh Sharma almost 2 years
I am new to Airflow.
I have come across a scenario, where Parent DAG need to pass some dynamic number (let's sayn
) to Sub DAG.
Where as SubDAG will use this number to dynamically createn
parallel tasks.Airflow documentation doesn't cover a way to achieve this. So I have explore couple of ways :
Option - 1(Using xcom Pull)
I have tried to pass as a xcom value, but for some reason SubDAG is not resolving to the passed value.
Parent Dag File
def load_dag(**kwargs): number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs']) dag_data = json.dumps({ "number_of_runs": number_of_runs }) return dag_data # ------------------ Tasks ------------------------------ load_config = PythonOperator( task_id='load_config', provide_context=True, python_callable=load_dag, dag=dag) t1 = SubDagOperator( task_id=CHILD_DAG_NAME, subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ), default_args=default_args, dag=dag, )
Sub Dag File
def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs): dag_subdag = DAG( dag_id='%s.%s' % (parent_dag_name, child_dag_name), default_args=args, schedule_interval=None) variabe_names = {} for i in range(num_of_runs): variabe_names['task' + str(i + 1)] = DummyOperator( task_id='dummy_task', dag=dag_subdag, ) return dag_subdag
Option - 2
I have also tried to pass
number_of_runs
as a global variable, which was not working.Option - 3
Also we tried to write this value to a data file. But sub DAG is throwing
File doesn't exist error
. This might be because we are dynamically generating this file.Can some one help me with this.
-
ybendana over 5 yearsThis doesn't work for me because the Jinja template isn't evaluated when calling sub_dag().
-
tom10271 almost 2 yearsJinja2 template string was not rendered because Airflow will only render if fields used are registered as template fields in Operator. airflow.apache.org/docs/apache-airflow/stable/concepts/…
You can also use Jinja templating with nested fields, as long as these nested fields are marked as templated in the structure they belong to: fields registered in template_fields property will be submitted to template substitution