How to delete XCOM objects once the DAG finishes its run in Airflow

10,520

Solution 1

You have to add a task depends on you metadatadb (sqllite, PostgreSql, MySql..) that delete XCOM once the DAG run is finished.

delete_xcom_task = PostgresOperator(
      task_id='delete-xcom-task',
      postgres_conn_id='airflow_db',
      sql="delete from xcom where dag_id=dag.dag_id and 
           task_id='your_task_id' and execution_date={{ ds }}",
      dag=dag)

You can verify your query before you run the dag.

Data Profiling -> Ad Hoc Query -> airflow_db -> query -> Run!

xcom metadata

Solution 2

You can perform the cleanup programmatically through sqlalchemy so your solution won't break if the database structure changes:

from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.dag_id == "your dag id").delete()

You can also purge old XCom data:

from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

If you want to purge the XCom once the dag is finished I think the cleanest solution is to use the "on_success_callback" property of the DAG model class:

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)

Solution 3

Below is the code that worked for me,this will delete xcom of all tasks in DAG(Add task_id to SQL if xcom of only specific task needs to be deleted):

As dag_id is dynamic and dates should follow respective syntax of SQL.

from airflow.operators.postgres_operator import PostgresOperator

delete_xcom_task_inst = PostgresOperator(task_id='delete_xcom',
                                            postgres_conn_id='your_conn_id',
                                            sql="delete from xcom where dag_id= '"+dag.dag_id+"' and date(execution_date)=date('{{ ds }}')"
                                            )

Solution 4

My solution to this problem is:

from airflow.utils.db import provide_session
from airflow.models import XCom

dag = DAG(...)

@provide_session
def cleanup_xcom(**context):     
    dag = context["dag"]
    dag_id = dag._dag_id 
    session=context["session"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

clean_xcom = PythonOperator(
    task_id="clean_xcom",
    python_callable = cleanup_xcom,
    provide_context=True, 
    dag=dag
)

clean_xcom

In Airflow 2.1.x, the code below likes not to work ...

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)

so change to

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

with DAG(dag_id="cleanup_xcom_demo", schedule_interval=None, start_date=days_ago(2)) as dag:
    # cleanup_xcom
    @provide_session
    def cleanup_xcom(session=None, **context):
        dag = context["dag"]
        dag_id = dag._dag_id 
        # It will delete all xcom of the dag_id
        session.query(XCom).filter(XCom.dag_id == dag_id).delete()

    clean_xcom = PythonOperator(
        task_id="clean_xcom",
        python_callable = cleanup_xcom,
        provide_context=True, 
        # dag=dag
    )
    
    start  = DummyOperator(task_id="start")
    end = DummyOperator(task_id="end", trigger_rule="none_failed")
    
    start >> clean_xcom >> end

Share:
10,520
vijay krishna
Author by

vijay krishna

Updated on June 25, 2022

Comments

  • vijay krishna
    vijay krishna almost 2 years

    I have a huge json file in the XCOM which later I do not need once the dag execution is finished, but I still see the Xcom Object in the UI with all the data, Is there any way to delete the XCOM programmatically once the DAG run is finished.

    Thank you

  • CTS_AE
    CTS_AE over 4 years
    This helped me fix a broken admin xcom page (it couldn't unserialize some value), and I didn't want to fuss with my docker container. I just threw this into a python operator that was already running and this get me my admin xcom page back :)
  • Alejandro Kaspar
    Alejandro Kaspar over 3 years
    I also agree this should be the proper way to do it, otherwise migrating the database will be an issue
  • SherylHohman
    SherylHohman over 3 years
    SO discourages code-only responses. Pleaser highlight important portions of your solution with an explanation as to how/why it solves the OP's issue. It's also good to note caveats to using the code, if any. If helpful, links to source docs can also be included as reference. Most upvotes are gained over time as users learn something from your answer that they can apply to their own coding issues.
  • cdabel
    cdabel about 3 years
    this was the only solution that worked for me, I'm on Airflow 1.10.10