How to set up multiple Dag directories in airflow

12,509

Solution 1

You can use packaged dag concept to have different dag folders for different projects. You will only need to place zip of each project in your parent dag folder.

This way you can combine dags with its dependencies easily and your dag folder will be neat and clean as it will only contain zip of each project.

You can create a zip that looks like this:

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

And your parent dag folder can look something like this:

project1.zip
project2.zip
my_dag3.py

Solution 2

Same problem here.

Indeed, our imports work because in the Airflow context, the DAG_FOLDER has been added to the PYTHONPATH. To add init.py in project1/ doesn't change anything.

A good solution could be use relative imports, as

from .mycalculator import *

But relative imports cannot work right now because of how Airflow imports Dags (explained to me by airflow developer)

So for me, the simpliest solution was to keep the dags files at the root, by prefixing them by 'project1_' or 'project2_', and put the libs like mycalculator in subfolders.

Solution 3

The folder /vol/dags/project1/ is missing an __init__.py file.

This file can be empty.

Add this file and then in project2.py you should be able to do:

import project1.mycalculator.*

See here for more info on packages: https://docs.python.org/2/tutorial/modules.html#packages

Share:
12,509

Related videos on Youtube

DevEx
Author by

DevEx

Updated on June 26, 2022

Comments

  • DevEx
    DevEx almost 2 years

    I have different airflow dags set up for different python projects i.e. one parent dags folder /vol/dags with subfolders for DAGs based on different python projects: /vol/dags/project1/project1.py, /vol/dags/project2/project2.py where DAGS_FOLDER = /vol/dags.

    project1.py for example imports a function from another python file in the same directory i.e./vol/dags/project1/mycalculator.py. But when I started airflow webserver, I get an ImportError:

    /vol/dags/project1/$ airflow webserver -p 8080
    
    INFO - Filling up the DagBag from /vol/dags/
    ERROR - Failed to import: /vol/dags/project1/project1.py
    Traceback (most recent call last):
      File "/Users/xxx/anaconda/lib/python2.7/site-packages/airflow/models.py", line 247, in process_file
        m = imp.load_source(mod_name, filepath)
      File "/vol/dags/project1/project1.py", line 10, in <module>
        from mycalculator import *
    ImportError: No module named mycalculator
    

    I tried to import mycalculator.py to project1.py like this:

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators import PythonOperator
    from datetime import datetime, timedelta
    from mycalculator import *
    
    dag = DAG(
        dag_id='project1', default_args=args,
        schedule_interval="@once")