How to pass parameters to Airflow on_success_callback and on_failure_callback

12,783

Solution 1

You could define a function inside your dag that calls the function from your package. And while calling that function, pass email as an argument. You can refine it further at your DAG level to pass only information required for the emails.

from package import outer_task_success_callback
email = '[email protected]'

def task_success_alert(context):
    dag_id = context['dag'].dag_id
    task_id = context['task_instance']. task_id
    outer_task_success_callback(dag_id, task_id, email)
    
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

This will allow you to customize before you call the function in your package.

On a side note, airflow has smtp email functionality. Instead of writing your own solution, you can utilize those.

Solution 2

You can use partial to create a function with a predefined argument like:

from functools import partial
new_task_success_alert = partial(task_success_alert, email='your_email')

And then add the new function as a callback:

on_success_callback=new_task_success_alert

Solution 3

You can create a task that its only purpose is to push configuration setting through xcoms. You can pull the configuration via context as the task_instance object is included in context.

def push_configuration(ti, params):
    ti.xcom_push(key='conn_id', value=params)

def task_success_alert(context):
    ti = context.get('ti') 
    params = ti.xcom_pull(key='params', task_ids='Settings')
    ...


step0 = PythonOperator(
        task_id='Settings',
        python_callable=push_configuration,
        op_kwargs={'params': params})

step1 = BashOperator(
        task_id='step1',
        bash_command='pwd',
        on_success_callback=task_success_alert)
Share:
12,783

Related videos on Youtube

Blessy
Author by

Blessy

Updated on June 18, 2022

Comments

  • Blessy
    Blessy almost 2 years

    I have implemented email alerts on success and failure using on_success_callback and on_failure_callback.

    According to Airflow documentation,

    a context dictionary is passed as a single parameter to this function.

    How can I pass another parameter to these callback methods?

    Here is my code

    from airflow.utils.email import send_email_smtp
    
    def task_success_alert(context):
        subject = "[Airflow] DAG {0} - Task {1}: Success".format(
            context['task_instance_key_str'].split('__')[0], 
            context['task_instance_key_str'].split('__')[1]
            )
        html_content = """
        DAG: {0}<br>
        Task: {1}<br>
        Succeeded on: {2}
        """.format(
            context['task_instance_key_str'].split('__')[0], 
            context['task_instance_key_str'].split('__')[1], 
            datetime.now()
            )
        send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)
    
    def task_failure_alert(context):
        subject = "[Airflow] DAG {0} - Task {1}: Failed".format(
            context['task_instance_key_str'].split('__')[0], 
            context['task_instance_key_str'].split('__')[1]
            )
        html_content = """
        DAG: {0}<br>
        Task: {1}<br>
        Failed on: {2}
        """.format(
            context['task_instance_key_str'].split('__')[0], 
            context['task_instance_key_str'].split('__')[1], 
            datetime.now()
            )
        send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2019, 6, 13),
        'on_success_callback': task_success_alert,
        'on_failure_callback': task_failure_alert
    }
    

    I intend to move the callbacks to another package and pass the email address as parameter.

    • Akshay Lande
      Akshay Lande almost 4 years
      When I use CONTEXT it my tasks finishes but dag keeps running.It never ends. any suggestions. I am using def task_alert(context): dag_id = context['dag'].dag_id \n task_id = context['task_instance']. task_id I am calling on_failure_callback=task_alert