Airflow worker stuck : Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run

11,950

I figured out the issue, it was the underlying infrastructure problem. I was using AWS EFS and the burst mode was blocking the worker as the throughput was reached. Changed to provisioned mode, workers are no more in a stuck state. I got the idea from ecs-airflow-1-10-2-performance-issues-operators-and-tasks-take-10x-longer

Share:
11,950
joss
Author by

joss

Updated on June 09, 2022

Comments

  • joss
    joss almost 2 years

    Airflow tasks run w/o any issues and suddenly half the way it gets stuck and the task instance details say above message.

    I cleared my entire database, but still, I am getting the same error.

    The fact is I am getting this issue for only some dags. Mostly when the long-running jobs.

    I am getting below error

    [2019-07-03 12:14:56,337] {{models.py:1353}} INFO - Dependencies not met for <TaskInstance: XXXXXX.index_to_es 2019-07-01T13:30:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.
    [2019-07-03 12:14:56,341] {{models.py:1353}} INFO - Dependencies not met for <TaskInstance: XXXXXX.index_to_es 2019-07-01T13:30:00+00:00 [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is already running, it started on 2019-07-03 05:58:51.601552+00:00.
    [2019-07-03 12:14:56,342] {{logging_mixin.py:95}} INFO - [2019-07-03 12:14:56,342] {{jobs.py:2514}} INFO - Task is not able to be run
    

    My dag looks like below

    default_args = {
        'owner': 'datascience',
        'depends_on_past': True,
        'start_date': datetime(2019, 6, 12),
        'email': ['[email protected]'],
        'email_on_failure': True,
        'email_on_retry': True,
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'nill',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    def get_index_date(**kwargs):
        tomorrow=kwargs.get('templates_dict').get('tomorrow')
        return str(tomorrow).replace('-','.')
    
    """
    Create Dags specify its features
    """
    dag = DAG(
        DAG_NAME,
        schedule_interval="0 9 * * *",
        catchup=True,
        default_args=default_args,
        template_searchpath='/efs/sql')
    
    create_table = BigQueryOperator(
        dag=dag,
        task_id='create_temp_table_from_query',
        sql='daily_demand.sql',
        use_legacy_sql=False,
        destination_dataset_table=TEMP_TABLE,
        bigquery_conn_id=CONNECTION_ID,
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE'
    )
    
    """Task to zip and export to GCS"""
    export_to_storage = BigQueryToCloudStorageOperator(
        task_id='export_to_GCS',
        source_project_dataset_table=TEMP_TABLE,
        destination_cloud_storage_uris=[CLOUD_STORAGE_URI],
        export_format='NEWLINE_DELIMITED_JSON',
        compression='GZIP',
        bigquery_conn_id=CONNECTION_ID,
        dag=dag)
    """Task to get the tomorrow execution date formatted for indexing"""
    get_index_date = PythonOperator(
        task_id='get_index_date',
        python_callable=get_index_date,
        templates_dict={'tomorrow':"{{ tomorrow_ds }}"},
        provide_context=True,
        dag=dag
    )
    """Task to download zipped files and bulkindex to elasticsearch"""
    es_indexing = EsDownloadAndIndexOperator(
        task_id="index_to_es",
        object=OBJECT,
        es_url=ES_URI,
        local_path=LOCAL_FILE,
        gcs_conn_id=CONNECTION_ID,
        bucket=GCS_BUCKET_ID,
        es_index_type='demand_shopper',
        es_bulk_batch=5000,
        es_index_name=INDEX,
        es_request_timeout=300,
        dag=dag)
    
    
    """Define the chronology of tasks in DAG"""
    create_table >> export_to_storage >> get_index_date >> es_indexing
    

    Thanks for your help