How to skip tasks on Airflow?
25,230
Solution 1
You can incorporate the SkipMixin that the ShortCircuitOperator uses under the hood to skip downstream tasks.
from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults
class mySkippingOperator(BaseOperator, SkipMixin)
@apply_defaults
def __init__(self,
condition,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.condition = condition
def execute(self, context):
if self.condition:
self.log.info('Proceeding with downstream tasks...')
return
self.log.info('Skipping downstream tasks...')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
self.log.info("Done.")
Solution 2
yes, you can do this by another ad-hoc basis. Found it somehow!!
You need to raise AirflowSkipException
from airflow.exceptions import AirflowSkipException
def execute():
if condition:
raise AirflowSkipException
task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)
Solution 3
Yes, you just click on task 3. Toggle the check boxes to the right of the run button to ignore dependencies, then click run.
Author by
Maayan
Updated on July 18, 2022Comments
-
Maayan almost 2 years
I'm trying to understand whether Airflow supports skipping tasks in a DAG for ad-hoc executions?
Lets say my DAG graph look like this: task1 > task2 > task3 > task4
And I would like to start my DAG manually from task3, what is the best way of doing that?
I've read about
ShortCircuitOperator
, but I'm looking for more ad-hoc solution which can apply once the execution is triggered.Thanks!
-
Maayan over 5 yearsThanks! but I was looking for something more ad-hoc - the ability to start from any task on any DAG. No matter what were the upstream dependencies and if those where met.
-
Ben Gregory over 5 yearsNot sure of a way to do that - you could set task3 to run regardless of outcome of task1 or task2 or add a branchOperator earlier to determine which to run but by default all tasks are going to run in an execution in the order that the graph indicates.
-
Maayan over 5 yearsThanks! Let's say that I'm only talking about manual triggering, without any scheduling
-
Tameem over 5 yearsThen why schedule in the first place?
-
Maayan over 5 yearsAirflow provides a good flow management. Not only scheduling. And we are mainly interested in that part - dependency graphs, parallelism and so on
-
Maayan over 5 yearsI'm part of a big organization, and Airflow is already there and it provides most of the functionality we need except for what I was asking in my question.
-
Tameem over 5 yearsYou are contradicting your own statements. When you have a downstream task dependent on the upstream(dependency), you cannot start the downstream as long as there is an update in the upstream. As of now, the best thing you can use is the custom skipping operator as described by @Ben Gregory. As I mentioned it before,
you cannot start task from any task in between
that is what dependency stands for. For a task to start, there must be a status update for all upstream tasks as long as it is not the first task. -
y2k-shubham over 4 yearsWe now have
LatestOnlyOperator
bypasses (to-some-extent) this limitation"..You cannot start task execution from any task in between.."
-
Thomas R over 4 yearsI tried this. And this is not skipping the tasks it will only cause the task3 and task2 directly without waiting. You would have to but the tasks as well into a comment.