Providing context in TriggerDagRunOperator
Solution 1
In Airflow2.0.x
, the equivalent of @efbbrown
's answer is:
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_step = TriggerDagRunOperator(
task_id="trigger_modelling",
trigger_dag_id="Dummy_Modelling",
conf={"message": "{{ dag_run.conf['message'] }}", "day":"{{
dag_run.conf['day'] }}"},
dag=dag
)
The pull request is described here on GitHub.
See the documentation for external-triggers and for trigger_dagrun.
Here is a YouTube video on the topic that shows the correct imports.
Solution 2
Solved:
The dag_run
object is stored in the context and so the configuration variables can be accessed in the python_callable
of the TriggerDagRunOperator
with this pattern:
def trigger(context, dag_run_obj):
dag_run_obj.payload = {
"message": context["dag_run"].conf["message"],
"day": context["dag_run"].conf["day"]
}
return dag_run_obj
trigger_step = TriggerDagRunOperator(
task_id="trigger_modelling",
trigger_dag_id="Dummy_Modelling",
python_callable=trigger,
dag=dag
)
Related videos on Youtube
efbbrown
Updated on November 01, 2022Comments
-
efbbrown over 1 year
I have a dag that has been triggered by another dag. I have passed through to this dag some configuration variables via the
DagRunOrder().payload
dictionary in the same way the official example has done.Now in this dag I have another
TriggerDagRunOperator
to start a second dag and would like to pass those same configuration variables through.I have succesfully accessed the payload variables in a
PythonOperator
like so:def run_this_func(ds, **kwargs): print("Remotely received value of {} for message and {} for day".format( kwargs["dag_run"].conf["message"], kwargs["dag_run"].conf["day"]) ) run_this = PythonOperator( task_id='run_this', provide_context=True, python_callable=run_this_func, dag=dag )
But the same pattern does not work in the
TriggerDagRunOperator
:def trigger(context, dag_run_obj, **kwargs): dag_run_obj.payload = { "message": kwargs["dag_run"].conf["message"], "day": kwargs["dag_run"].conf["day"] } return dag_run_obj trigger_step = TriggerDagRunOperator( task_id="trigger_modelling", trigger_dag_id="Dummy_Modelling", provide_context=True, python_callable=trigger, dag=dag )
It yields a warning regarding the use of
provide_context
:INFO - Subtask: /usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to TriggerDagRunOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: INFO - Subtask: *args: () INFO - Subtask: **kwargs: {'provide_context': True} INFO - Subtask: category=PendingDeprecationWarning
And this error suggesting I haven't passed the conf :
INFO - Subtask: Traceback (most recent call last): INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run INFO - Subtask: result = task_copy.execute(context=context) INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/operators/dagrun_operator.py", line 64, in execute INFO - Subtask: dro = self.python_callable(context, dro) INFO - Subtask: File "/home/user/airflow/dags/dummy_responses.py", line 28, in trigger INFO - Subtask: "message": kwargs["dag_run"].conf["message"], INFO - Subtask: KeyError: 'dag_run'
A second pattern that I've tried which also hasn't worked is using the
params
argument like so:def trigger(context, dag_run_obj): dag_run_obj.payload = { "message": context['params']['message'], "day": context['params']['day'] } return dag_run_obj trigger_step = TriggerDagRunOperator( task_id="trigger_modelling", trigger_dag_id="Dummy_Modelling", python_callable=trigger, params={ "message": "{{ dag_run.conf['message'] }}", "day": "{{ dag_run.conf['day'] }}" }, dag=dag )
This pattern does not yield an error but instead passes the parameters through to the next dag as strings ie it doesn't evaluate the expressions.
How can I access the configuration variables in the
TriggerDagRunOperator
of the second dag? -
CodingInCircles almost 6 yearsHow did you end up reading the passed parameters in the triggered DAG? I need to do the same, and while I can confirm the parameter I want to pass is part of the dag run object's payload, I can't seem to "read" it in the triggered DAG.
-
efbbrown almost 6 yearsI could access the parameters in my triggered dag with
"{{ dag_run.conf['message'] }}"
and"{{ dag_run.conf['day'] }}"
. This relies on the fields in the operator through which you are trying to read the parameters being template_fields. If the"{{ dag_run.conf['day'] }}"
pattern doesn't work for you because the fields aren't template_fields, you will be able to extend the operator class which you are using to make those fields template_fields. Let me know if this doesn't make sense and I will include it in my answer. -
Sammy J over 4 yearsHi, welcome to Stackoverflow, can you add some code sample such that it makes it easier to understand.
-
Malgi about 3 years@efbbrown this solution is not working in Airflow v2.0.1; i'm getting this error:
Invalid arguments were passed to TriggerDagRunOperator
. Do you know how we could be passing context inTriggerDagRunOperator
in Airflow version 2? -
efbbrown about 3 yearsThankyou @Sawan Vaidya
-
mathee almost 3 yearsHi, Is it not possible to create the dag_run_obj payload or the conf parameter dynamically? When triggering the TriggerDagRunOperator? This used to be possible before the python_callable parameter was depreciated in Airflow 2.0.
-
taari almost 3 years@mathee, Yes, it appears that you can't do that using TriggerDagRunOperator. I solved that by adding an extra PythonOperator right before I called the TriggerDagRunOperator and I set up the dag_run_obj there
-
raman over 2 years@taari, I tried setting the dag_run_obj in python operator it didn't worked for me can share the snippet.