Airflow not scheduling Correctly Python
Solution 1
For Code 2, I guess the reason why it runs every minute is:
The start time is 2015-10-13 00:00
The schedule interval is 5 minutes
-
Every heartbeat of scheduler(5 seconds by default), your DAG will be checked
- First check: start date(no last execution date found) + scheduler interval < current time? If yes the DAG will be executed and last execution time will be recorded. (eg. 2015-10-13 00:00 + 5min < current?)
- Second check on next heartbeat: last execution time + scheduler interval < current time? If so the DAG will be executed again.
- ....
The solution is set the DAG start_date as datetime.now() - schedule_interval
.
And also if you want to debug:
Setting the LOGGINGLEVEL to
debug
in settings.pyModify class method
is_queueable()
ofairflow.models.TaskInstance
to
:
def is_queueable(self, flag_upstream_failed=False):
logging.debug('Checking whether task instance is queueable or not!')
if self.execution_date > datetime.now() - self.task.schedule_interval:
logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now()))
return False
...
Solution 2
Because the start time(2015-10-13 00:00) less than now time, it triggers the airflow backfill. It will run from 2015-10-13 00:00 when every seconds the airflow scheduler detected(its the Start Date), but Execution Date is between 5 min(task interval time).
See the log name:
$tree airflow/logs/testing/
testing/
|-- Orders10
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders11
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders12
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders13
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders14
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
-- Start1
|-- 2015-10-13T00:00:00
|-- 2015-10-13T00:05:00
|-- 2015-10-13T00:10:00
-- 2015-10-13T00:15:00
See the create time of logs:
$ll airflow/logs/testing/Start1
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:00:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:05:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:51 2015-10-13T00:10:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:52 2015-10-13T00:15:00
Also, you can see the Task Instances on web UI:
Related videos on Youtube
The6thSense
I see dead people looking at my profile Work hard until your Idol becomes your rival while (TRUE): if rep>=7000: set Target(8000) else: try till you get it If you want to be friends here is my fb account
Updated on September 15, 2022Comments
-
The6thSense over 1 year
Code:
Python version 2.7.x and airflow version 1.5.1
my dag script is this
from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'xyz', 'depends_on_past': False, 'start_date': datetime(2015,10,13), 'email': ['[email protected]'], 'schedule_interval':timedelta(minutes=5), 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('testing', default_args=default_args) run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first)
From that you could see that I am creating a DAG with 6 tasks the first task(Start1) starts first after which all the other five tasks starts
Currently I have given 5 minutes time delay between DAG's starting
It has ran perfectly for all the six tasks the first type but after five minutes the DAG is not re-initiated
It has been more then 1 hour still the DAG is not re-initiated I really don't know were I am wrong .
It would be really nice if some one could point me out what is wrong .I tried clearing using
airflow testing clear
then to the same thing happen.It ran first instance then just stood there.The only thing the command line shows is
Getting all instance for DAG testing
When I change the position of the schedule_interval it just runs with out any schedule interval parallel.That is with in 5 minutes 300 or more task instance has been completed. There is no 5 minute schedule interval
Code 2:
from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'xyz', 'depends_on_past': False, 'start_date': datetime(2015,10,13), 'email': ['[email protected]'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first)
-
The6thSense over 8 yearsso you are saying that it will run for every five seconds until the execution date meats the current date time after which it will follow the scheduled time interval
-
Yongyiw over 8 yearsYes, that's what I mean.
-
The6thSense over 8 yearsthanks a lot mate but I have two doubts .how can I schedule a task a starting from this second with a schedule interval of one hour.Can I schedule a job for the future