Any example of Airflow FileSensor?

15,788

Solution 1

From the documentation & source code:

from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator    import DummyOperator

import datetime
import airflow

# https://airflow.apache.org/code.html#airflow.models.BaseOperator
default_args = {
    "depends_on_past" : False,
    "start_date"      : airflow.utils.dates.days_ago( 1 ),
    "retries"         : 1,
    "retry_delay"     : datetime.timedelta( hours= 5 ),
}

with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:

    start_task  = DummyOperator(  task_id= "start" )
    stop_task   = DummyOperator(  task_id= "stop"  )
    sensor_task = FileSensor( task_id= "my_file_sensor_task", poke_interval= 30, fs_conn_id= <path>, filepath= <file or directory name> )

start_task >> sensor_task >> stop_task

Solution 2

A simple example of a FileSensor task:

second_task = FileSensor(
                 task_id="file_sensor_task_id",
                 filepath="{{ task_instance.xcom_pull(task_ids='get_filepath_task') }}",
                 #fs_conn_id="fs_default" # default one, commented because not needed
                 poke_interval= 20,
                 dag=dag
              )

Here I'm passing as filepath the returned value of the previous PythonOperator task_id (named get_filepath_task) using xcom_pull. But it can be a whatever string of a filepath or directory that you are checking the existence.

The fs_conn_id parameter is the string name of a connection you have available in the UI Admin/Connections section.

The default value of fs_conn_id is "fs_default" (you can see it in the code of the FileSensor class operator). Check the UI Admin/Connections and you will find it.

You can skip to pass fs_conn_id and just pass the parameter filepath if you want to check if a file or a directory exists locally.

The poke_interval is inherited from BaseSensorOperator and it indicates the time in seconds that the job should wait in between each tries. The default value is 60 seconds.

Share:
15,788
DevEx
Author by

DevEx

Updated on June 23, 2022

Comments

  • DevEx
    DevEx almost 2 years

    Whoever can please point me to an example of how to use Airflow FileSensor? I've googled and haven't found anything yet. Any example would be sufficient. My use case is quite simple:

    Wait for a scheduled DAG to drop a file in a path, FileSensor task picks it up, read content and process it.

  • user3791111
    user3791111 over 4 years
    What is "fs_conn_id" and what do I need to substitute here?
  • codeBarer
    codeBarer almost 4 years
    What if the file exists from past job. How can you distinguish from job that wrote the file yesterday vs. today
  • Meghdeep Ray
    Meghdeep Ray almost 4 years
    That is why we have the version number in the DAG name. Change "file_sensor_test_v1" to "file_sensor_test_v2" and so on as you create new versions.
  • Stefan Papp
    Stefan Papp about 2 years
    fs_conn_id is a connection that you normally define in the UI for the path. Not knowing this can create some confusion. So in the fs_conn_id you define a connection, which might also contain a path.