Airflow dynamic tasks at runtime

12,124

Solution 1

It it not possible to modify the DAG during its execution (without a lot more work).

The dag = DAG(... is picked up in a loop by the scheduler. It will have task instance 'python_operator' in it. That task instance gets scheduled in a dag run, and executed by a worker or executor. Since DAG models in the Airflow DB are only updated by the scheduler these added dummy tasks will not be persisted to the DAG nor scheduled to run. They will be forgotten when the worker exits. Unless you copy all the code from the scheduler regarding persisting & updating the model… but that will be undone the next time the scheduler visits the DAG file for parsing, which could be happening once a minute, once a second or faster depending how many other DAG files there are to parse.

Airflow actually wants each DAG to approximately stay the same layout between runs. It also wants to reload/parse DAG files constantly. So though you could make a DAG file that on each run determines the tasks dynamically based on some external data (preferably cached in a file or pyc module, not network I/O like a DB lookup, you'll slow down the whole scheduling loop for all the DAGs) it's not a good plan as your graph and tree view will get all confusing, and your scheduler parsing will be more taxed by that lookup.

You could make the callable run each task…

def make_tasks(context):
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1.execute(context)
    du2.execute(context)
    du3.execute(context)

p = PythonOperator(
    provides_context=true,

But that's sequential, and you have to work out how to use python to make them parallel (use futures?) and if any raise an exception the whole task fails. Also it is bound to one executor or worker so not using airflow's task distribution (kubernetes, mesos, celery).

The other way to work with this is to add a fixed number of tasks (the maximal number), and use the callable(s) to short circuit the unneeded tasks or push arguments with xcom for each of them, changing their behavior at run time but not changing the DAG.

Solution 2

Regarding your code sample, you never call your function which registers your tasks in your DAG.

To have a kind of dynamic tasks, you can have a single operator which does something different depending on some state or you can have a handful of operators which can be skipped depending on the state, with a ShortCircuitOperator.

Solution 3

I appreciate all the work everybody has done here as I have the same challenge of creating dynamically structured DAGs. I have done enough mistakes to not use software against its design. If I cant inspect the whole run on the UI and zoom in and out, basically use airflow features, which are the main reason I use it anyway. I can just write multiprocessing code inside a function and be done with it as well.

That all being said my solution is to use a resource manager such as redis locking and have a DAG that writes to this resource manager with data about what to run how to run etc; and have another DAG or DAGs that run in certain intervals polling the resource manager, locking them before running and removing them at finish. This way at least I use airflow as expected even though its specifications dont exactly meet my needs. I breakdown the problem into more definable chunks. The solutions are creative but they are against the design and not tested by the developers. The specifically say to have fixed structured workflows. I cannot put a work around code that is not tested and against design unless I rewrite the core airflow code and test myself. I understand my solution brings complexity with locking and all that but at least I know the boundaries to that.

Share:
12,124
Kirk Broadhurst
Author by

Kirk Broadhurst

I'm a data engineer who is passionate about learning new ways of doing things. I enjoy SQL and Python, and I particularly enjoy algorithms and solving problems. My spare time is spent with my kids. If there's anything left, it's split between motorsport, music, and learning/reading. I enjoy politics and have been known to play the odd video game.

Updated on June 19, 2022

Comments

  • Kirk Broadhurst
    Kirk Broadhurst almost 2 years

    Other questions about 'dynamic tasks' seem to address dynamic construction of a DAG at schedule or design time. I'm interested in dynamically adding tasks to a DAG during execution.

    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime
    
    dag = DAG('test_dag', description='a test',
              schedule_interval='0 0 * * *',
              start_date=datetime(2018, 1, 1),
              catchup=False)
    
    def make_tasks():
        du1 = DummyOperator(task_id='dummy1', dag=dag)
        du2 = DummyOperator(task_id='dummy2', dag=dag)
        du3 = DummyOperator(task_id='dummy3', dag=dag)
        du1 >> du2 >> du3
    
    p = PythonOperator(
        task_id='python_operator',
        dag=dag,
        python_callable=make_tasks)
    

    This naive implementation doesn't seem to work - the dummy tasks never show up in the UI.

    What's the correct way to add new operators to the DAG during execution? Is it possible?

  • Kirk Broadhurst
    Kirk Broadhurst about 6 years
    It is listed as the python_callable on python_operator, and it will be called during the execution of the the task. The point of the question is whether this execution pattern is possible. A lot of the 'dynamic' DAG that I have seen suggested is really just decision based execution rather than truly dynamic.
  • Antoine Augusti
    Antoine Augusti about 6 years
    I would be in favour of execution based because if your DAG is dynamic, it's hard to inspect in the Web UI the previous tasks. It's due to the fact that the list of tasks of the DAG is defined on the current state and not what it was at the time of the execution
  • Kirk Broadhurst
    Kirk Broadhurst about 6 years
    That's part of my concern.This just seems like something outside the Airflow wheelhouse, so want to see if anyone has done it.