Providing context in TriggerDagRunOperator

13,953

Solution 1

In Airflow2.0.x, the equivalent of @efbbrown's answer is:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    conf={"message": "{{ dag_run.conf['message'] }}", "day":"{{ 
    dag_run.conf['day'] }}"},
    dag=dag
)

The pull request is described here on GitHub.

See the documentation for external-triggers and for trigger_dagrun.

Here is a YouTube video on the topic that shows the correct imports.

Solution 2

Solved:

The dag_run object is stored in the context and so the configuration variables can be accessed in the python_callable of the TriggerDagRunOperator with this pattern:

def trigger(context, dag_run_obj):
    dag_run_obj.payload = {
        "message": context["dag_run"].conf["message"],
        "day": context["dag_run"].conf["day"]
    }
    return dag_run_obj

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    python_callable=trigger,
    dag=dag
)
Share:
13,953

Related videos on Youtube

efbbrown
Author by

efbbrown

Updated on November 01, 2022

Comments

  • efbbrown
    efbbrown over 1 year

    I have a dag that has been triggered by another dag. I have passed through to this dag some configuration variables via the DagRunOrder().payload dictionary in the same way the official example has done.

    Now in this dag I have another TriggerDagRunOperator to start a second dag and would like to pass those same configuration variables through.

    I have succesfully accessed the payload variables in a PythonOperator like so:

    def run_this_func(ds, **kwargs):
        print("Remotely received value of {} for message and {} for day".format(
            kwargs["dag_run"].conf["message"], kwargs["dag_run"].conf["day"])
        )
    
    run_this = PythonOperator(
        task_id='run_this',
        provide_context=True,
        python_callable=run_this_func,
        dag=dag
    )
    

    But the same pattern does not work in the TriggerDagRunOperator:

    def trigger(context, dag_run_obj, **kwargs):
        dag_run_obj.payload = {
            "message": kwargs["dag_run"].conf["message"],
            "day": kwargs["dag_run"].conf["day"]
        }
        return dag_run_obj
    
    trigger_step = TriggerDagRunOperator(
        task_id="trigger_modelling",
        trigger_dag_id="Dummy_Modelling",
        provide_context=True,
        python_callable=trigger,
        dag=dag
    )
    

    It yields a warning regarding the use of provide_context:

    INFO - Subtask: /usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to TriggerDagRunOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
    INFO - Subtask: *args: ()
    INFO - Subtask: **kwargs: {'provide_context': True}
    INFO - Subtask:   category=PendingDeprecationWarning
    

    And this error suggesting I haven't passed the conf :

    INFO - Subtask: Traceback (most recent call last):
    INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
    INFO - Subtask:     result = task_copy.execute(context=context)
    INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/operators/dagrun_operator.py", line 64, in execute
    INFO - Subtask:     dro = self.python_callable(context, dro)
    INFO - Subtask:   File "/home/user/airflow/dags/dummy_responses.py", line 28, in trigger
    INFO - Subtask:     "message": kwargs["dag_run"].conf["message"],
    INFO - Subtask: KeyError: 'dag_run'
    

    A second pattern that I've tried which also hasn't worked is using the params argument like so:

    def trigger(context, dag_run_obj):
        dag_run_obj.payload = {
            "message": context['params']['message'],
            "day": context['params']['day']
        }
        return dag_run_obj
    
    trigger_step = TriggerDagRunOperator(
        task_id="trigger_modelling",
        trigger_dag_id="Dummy_Modelling",
        python_callable=trigger,
        params={
            "message": "{{ dag_run.conf['message'] }}",
            "day": "{{ dag_run.conf['day'] }}"
        },
        dag=dag
    )
    

    This pattern does not yield an error but instead passes the parameters through to the next dag as strings ie it doesn't evaluate the expressions.


    How can I access the configuration variables in the TriggerDagRunOperator of the second dag?

  • CodingInCircles
    CodingInCircles almost 6 years
    How did you end up reading the passed parameters in the triggered DAG? I need to do the same, and while I can confirm the parameter I want to pass is part of the dag run object's payload, I can't seem to "read" it in the triggered DAG.
  • efbbrown
    efbbrown almost 6 years
    I could access the parameters in my triggered dag with "{{ dag_run.conf['message'] }}" and "{{ dag_run.conf['day'] }}". This relies on the fields in the operator through which you are trying to read the parameters being template_fields. If the "{{ dag_run.conf['day'] }}" pattern doesn't work for you because the fields aren't template_fields, you will be able to extend the operator class which you are using to make those fields template_fields. Let me know if this doesn't make sense and I will include it in my answer.
  • Sammy J
    Sammy J over 4 years
    Hi, welcome to Stackoverflow, can you add some code sample such that it makes it easier to understand.
  • Malgi
    Malgi about 3 years
    @efbbrown this solution is not working in Airflow v2.0.1; i'm getting this error: Invalid arguments were passed to TriggerDagRunOperator. Do you know how we could be passing context in TriggerDagRunOperator in Airflow version 2?
  • efbbrown
    efbbrown about 3 years
    Thankyou @Sawan Vaidya
  • mathee
    mathee almost 3 years
    Hi, Is it not possible to create the dag_run_obj payload or the conf parameter dynamically? When triggering the TriggerDagRunOperator? This used to be possible before the python_callable parameter was depreciated in Airflow 2.0.
  • taari
    taari almost 3 years
    @mathee, Yes, it appears that you can't do that using TriggerDagRunOperator. I solved that by adding an extra PythonOperator right before I called the TriggerDagRunOperator and I set up the dag_run_obj there
  • raman
    raman over 2 years
    @taari, I tried setting the dag_run_obj in python operator it didn't worked for me can share the snippet.