Python script scheduling in airflow

55,967

Solution 1

You should probably use the PythonOperator to call your function. If you want to define the function somewhere else, you can simply import it from a module as long as it's accessible in your PYTHONPATH.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from my_script import my_python_function

dag = DAG('tutorial', default_args=default_args)

PythonOperator(dag=dag,
               task_id='my_task_powered_by_python',
               provide_context=False,
               python_callable=my_python_function,
               op_args=['arguments_passed_to_callable'],
               op_kwargs={'keyword_argument':'which will be passed to function'})

If your function my_python_function was in a script file /path/to/my/scripts/dir/my_script.py

Then before starting Airflow, you could add the path to your scripts to the PYTHONPATH like so:

export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH

More information here: https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

Default args and other considerations as in the tutorial: https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html

Solution 2

You can also use bashoperator to execute python scripts in Airflow. You can put your scripts in a folder in DAG folder. If your scripts are somewhere else, just give a path to those scripts.

    from airflow import DAG
    from airflow.operators import BashOperator,PythonOperator
    from datetime import datetime, timedelta

    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                      datetime.min.time())

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': seven_days_ago,
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
      }

    dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
    task_id='testairflow',
    bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
    dag=dag)
Share:
55,967
Abhishek Pansotra
Author by

Abhishek Pansotra

Updated on July 07, 2021

Comments

  • Abhishek Pansotra
    Abhishek Pansotra almost 3 years

    Hi everyone,

    I need to schedule my python files(which contains data extraction from sql and some joins) using airflow. I have successfully installed airflow into my linux server and webserver of airflow is available with me. But even after going through documentation I am not clear where exactly I need to write script for scheduling and how will that script be available into airflow webserver so I could see the status

    As far as the configuration is concerned I know where the dag folder is located in my home directory and also where example dags are located.

    Note: Please dont mark this as duplicate with How to run bash script file in Airflow as I need to run python files lying in some different location.

    Please find the configuration in Airflow webserver as :

    enter image description here

    Below is the screenshot of dag folder in AIRFLOW_HOME dir

    enter image description here

    Also find the below screenshot for DAG creation screenshot and Missing DAG error

    enter image description here

    enter image description here

    After i select the simple DAG following error of missing DAG is populated

    enter image description here