Airflow not scheduling Correctly Python

14,907

Solution 1

For Code 2, I guess the reason why it runs every minute is:

  1. The start time is 2015-10-13 00:00

  2. The schedule interval is 5 minutes

  3. Every heartbeat of scheduler(5 seconds by default), your DAG will be checked

    • First check: start date(no last execution date found) + scheduler interval < current time? If yes the DAG will be executed and last execution time will be recorded. (eg. 2015-10-13 00:00 + 5min < current?)
    • Second check on next heartbeat: last execution time + scheduler interval < current time? If so the DAG will be executed again.
    • ....

The solution is set the DAG start_date as datetime.now() - schedule_interval.

And also if you want to debug:

  1. Setting the LOGGINGLEVEL to debug in settings.py

  2. Modify class method is_queueable() of airflow.models.TaskInstance to

:

def is_queueable(self, flag_upstream_failed=False):
    logging.debug('Checking whether task instance is queueable or not!')
    if self.execution_date > datetime.now() - self.task.schedule_interval:
        logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now()))
        return False
        ...

Solution 2

Because the start time(2015-10-13 00:00) less than now time, it triggers the airflow backfill. It will run from 2015-10-13 00:00 when every seconds the airflow scheduler detected(its the Start Date), but Execution Date is between 5 min(task interval time).

See the log name:

$tree airflow/logs/testing/
testing/
|-- Orders10
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders11
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders12
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders13
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders14
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
-- Start1
    |-- 2015-10-13T00:00:00
    |-- 2015-10-13T00:05:00
    |-- 2015-10-13T00:10:00
    -- 2015-10-13T00:15:00

See the create time of logs:

$ll airflow/logs/testing/Start1
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:50 2015-10-13T00:00:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:50 2015-10-13T00:05:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:51 2015-10-13T00:10:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:52 2015-10-13T00:15:00

Also, you can see the Task Instances on web UI:

air flow Task Instances

Share:
14,907

Related videos on Youtube

The6thSense
Author by

The6thSense

I see dead people looking at my profile Work hard until your Idol becomes your rival while (TRUE): if rep&gt;=7000: set Target(8000) else: try till you get it If you want to be friends here is my fb account

Updated on September 15, 2022

Comments

  • The6thSense
    The6thSense over 1 year

    Code:

    Python version 2.7.x and airflow version 1.5.1

    my dag script is this

    from airflow import DAG
    from airflow.operators import BashOperator
    from datetime import datetime, timedelta
    
    
    default_args = {
    'owner': 'xyz',
    'depends_on_past': False,
    'start_date': datetime(2015,10,13),
    'email': ['[email protected]'],
    'schedule_interval':timedelta(minutes=5),
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    }
    dag = DAG('testing', default_args=default_args)
    run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
    for i in range(5):
        t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
        t.set_upstream(run_this_first)
    

    From that you could see that I am creating a DAG with 6 tasks the first task(Start1) starts first after which all the other five tasks starts

    Currently I have given 5 minutes time delay between DAG's starting

    It has ran perfectly for all the six tasks the first type but after five minutes the DAG is not re-initiated

    It has been more then 1 hour still the DAG is not re-initiated I really don't know were I am wrong .

    It would be really nice if some one could point me out what is wrong .I tried clearing using airflow testing clear then to the same thing happen.It ran first instance then just stood there.

    The only thing the command line shows is Getting all instance for DAG testing

    When I change the position of the schedule_interval it just runs with out any schedule interval parallel.That is with in 5 minutes 300 or more task instance has been completed. There is no 5 minute schedule interval

    Code 2:

    from airflow import DAG
    from airflow.operators import BashOperator
    from datetime import datetime, timedelta
    
    
    default_args = {
    'owner': 'xyz',
    'depends_on_past': False,
    'start_date': datetime(2015,10,13),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    }
    dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
    run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
    for i in range(5):
        t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
        t.set_upstream(run_this_first)
    
  • The6thSense
    The6thSense over 8 years
    so you are saying that it will run for every five seconds until the execution date meats the current date time after which it will follow the scheduled time interval
  • Yongyiw
    Yongyiw over 8 years
    Yes, that's what I mean.
  • The6thSense
    The6thSense over 8 years
    thanks a lot mate but I have two doubts .how can I schedule a task a starting from this second with a schedule interval of one hour.Can I schedule a job for the future