Airflow - Proper way to handle DAGs callbacks
Solution 1
The reason it's creating the messages is because when you are defining your default_args
, you are executing the functions. You need to just pass the function definition without executing it.
Since the function has an argument, it'll get a little trickier. You can either define two partial functions or define two wrapper functions.
So you can either do:
from functools import partial
success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
or
def success_msg():
slack.slack_message(happy_message);
def failure_msg():
slack.slack_message(sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
In either method, note how just the function definition failure_msg
and success_msg
are being passed, not the result they give when executed.
Solution 2
default_args expands at task level, therefore it becomes per task callback
apply the attribute at DAG flag level outside of "default_args"
Related videos on Youtube
Julinho da Adelaide
Updated on May 30, 2022Comments
-
Julinho da Adelaide almost 2 years
I have a
DAG
and then whenever it success or fails, I want it to trigger a method which posts to Slack.My
DAG args
is like below:default_args = { [...] 'on_failure_callback': slack.slack_message(sad_message), 'on_success_callback': slack.slack_message(happy_message), [...] }
And the
DAG
definition itself:dag = DAG( dag_id = dag_name_id, default_args=default_args, description='load data from mysql to S3', schedule_interval='*/10 * * * *', catchup=False )
But when I check Slack there is more than 100 message each minute, as if is evaluating at each scheduler heartbeat and for every log it did runned the success and failure method as if it worked and didn't work for the same task instance (not fine).
How should I properly use the
on_failure_callback
andon_success_callback
to handle dags statuses and call a custom method?-
Zack almost 6 yearsDuplicate of this stackoverflow.com/questions/44586356/…? Rather than using
on_failure_callback
andon_success_callback
, why not just make the slack message a task in your DAG as you are requesting a message whether the task is a success/failure. -
cwurtz almost 6 yearsNot a duplicate, this question is specifically about usage of success/failure callbacks
-
-
Julinho da Adelaide almost 6 yearsIt did worked. But seems to spawn a message to each task and not to each dag run. My code right now is made to run at each dag run, I'm thinking of adapting it to each task run but then I need to retrieve its
task_id
, is that possible? Other way I think is to make it once per dag run so I don't need to change the code. Is this possible aswell? Many Thanks. -
tobi6 almost 6 years@JulinhodaAdelaide I think 1.9.0 can only define this on per task level, 1.10.0 will have a DAG based definition available.
-
Julinho da Adelaide almost 6 yearstobi6, do you know how can I retrieve the task_id?