Airflow worker stuck : Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run
11,950
I figured out the issue, it was the underlying infrastructure problem. I was using AWS EFS and the burst mode was blocking the worker as the throughput was reached. Changed to provisioned mode, workers are no more in a stuck state. I got the idea from ecs-airflow-1-10-2-performance-issues-operators-and-tasks-take-10x-longer
Author by
joss
Updated on June 09, 2022Comments
-
joss almost 2 years
Airflow tasks run w/o any issues and suddenly half the way it gets stuck and the task instance details say above message.
I cleared my entire database, but still, I am getting the same error.
The fact is I am getting this issue for only some dags. Mostly when the long-running jobs.
I am getting below error
[2019-07-03 12:14:56,337] {{models.py:1353}} INFO - Dependencies not met for <TaskInstance: XXXXXX.index_to_es 2019-07-01T13:30:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-07-03 12:14:56,341] {{models.py:1353}} INFO - Dependencies not met for <TaskInstance: XXXXXX.index_to_es 2019-07-01T13:30:00+00:00 [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is already running, it started on 2019-07-03 05:58:51.601552+00:00. [2019-07-03 12:14:56,342] {{logging_mixin.py:95}} INFO - [2019-07-03 12:14:56,342] {{jobs.py:2514}} INFO - Task is not able to be run
My dag looks like below
default_args = { 'owner': 'datascience', 'depends_on_past': True, 'start_date': datetime(2019, 6, 12), 'email': ['[email protected]'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 3, 'retry_delay': timedelta(minutes=5), # 'queue': 'nill', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } def get_index_date(**kwargs): tomorrow=kwargs.get('templates_dict').get('tomorrow') return str(tomorrow).replace('-','.') """ Create Dags specify its features """ dag = DAG( DAG_NAME, schedule_interval="0 9 * * *", catchup=True, default_args=default_args, template_searchpath='/efs/sql') create_table = BigQueryOperator( dag=dag, task_id='create_temp_table_from_query', sql='daily_demand.sql', use_legacy_sql=False, destination_dataset_table=TEMP_TABLE, bigquery_conn_id=CONNECTION_ID, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_TRUNCATE' ) """Task to zip and export to GCS""" export_to_storage = BigQueryToCloudStorageOperator( task_id='export_to_GCS', source_project_dataset_table=TEMP_TABLE, destination_cloud_storage_uris=[CLOUD_STORAGE_URI], export_format='NEWLINE_DELIMITED_JSON', compression='GZIP', bigquery_conn_id=CONNECTION_ID, dag=dag) """Task to get the tomorrow execution date formatted for indexing""" get_index_date = PythonOperator( task_id='get_index_date', python_callable=get_index_date, templates_dict={'tomorrow':"{{ tomorrow_ds }}"}, provide_context=True, dag=dag ) """Task to download zipped files and bulkindex to elasticsearch""" es_indexing = EsDownloadAndIndexOperator( task_id="index_to_es", object=OBJECT, es_url=ES_URI, local_path=LOCAL_FILE, gcs_conn_id=CONNECTION_ID, bucket=GCS_BUCKET_ID, es_index_type='demand_shopper', es_bulk_batch=5000, es_index_name=INDEX, es_request_timeout=300, dag=dag) """Define the chronology of tasks in DAG""" create_table >> export_to_storage >> get_index_date >> es_indexing
Thanks for your help