Airflow - Proper way to handle DAGs callbacks

10,028

Solution 1

The reason it's creating the messages is because when you are defining your default_args, you are executing the functions. You need to just pass the function definition without executing it.

Since the function has an argument, it'll get a little trickier. You can either define two partial functions or define two wrapper functions.

So you can either do:

from functools import partial

success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

or

def success_msg():
    slack.slack_message(happy_message);

def failure_msg():
    slack.slack_message(sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

In either method, note how just the function definition failure_msg and success_msg are being passed, not the result they give when executed.

Solution 2

default_args expands at task level, therefore it becomes per task callback

apply the attribute at DAG flag level outside of "default_args"

Share:
10,028

Related videos on Youtube

Julinho da Adelaide
Author by

Julinho da Adelaide

Updated on May 30, 2022

Comments

  • Julinho da Adelaide
    Julinho da Adelaide almost 2 years

    I have a DAG and then whenever it success or fails, I want it to trigger a method which posts to Slack.

    My DAG args is like below:

    default_args = {
        [...]
        'on_failure_callback': slack.slack_message(sad_message),
        'on_success_callback': slack.slack_message(happy_message),
        [...]
    }
    

    And the DAG definition itself:

    dag = DAG(
        dag_id = dag_name_id,
        default_args=default_args,
        description='load data from mysql to S3',
        schedule_interval='*/10 * * * *',
        catchup=False
          )
    

    But when I check Slack there is more than 100 message each minute, as if is evaluating at each scheduler heartbeat and for every log it did runned the success and failure method as if it worked and didn't work for the same task instance (not fine).

    How should I properly use the on_failure_callback and on_success_callback to handle dags statuses and call a custom method?

    • Zack
      Zack almost 6 years
      Duplicate of this stackoverflow.com/questions/44586356/…? Rather than using on_failure_callback and on_success_callback, why not just make the slack message a task in your DAG as you are requesting a message whether the task is a success/failure.
    • cwurtz
      cwurtz almost 6 years
      Not a duplicate, this question is specifically about usage of success/failure callbacks
  • Julinho da Adelaide
    Julinho da Adelaide almost 6 years
    It did worked. But seems to spawn a message to each task and not to each dag run. My code right now is made to run at each dag run, I'm thinking of adapting it to each task run but then I need to retrieve its task_id, is that possible? Other way I think is to make it once per dag run so I don't need to change the code. Is this possible aswell? Many Thanks.
  • tobi6
    tobi6 almost 6 years
    @JulinhodaAdelaide I think 1.9.0 can only define this on per task level, 1.10.0 will have a DAG based definition available.
  • Julinho da Adelaide
    Julinho da Adelaide almost 6 years
    tobi6, do you know how can I retrieve the task_id?