Any example of Airflow FileSensor?
Solution 1
From the documentation & source code:
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator import DummyOperator
import datetime
import airflow
# https://airflow.apache.org/code.html#airflow.models.BaseOperator
default_args = {
"depends_on_past" : False,
"start_date" : airflow.utils.dates.days_ago( 1 ),
"retries" : 1,
"retry_delay" : datetime.timedelta( hours= 5 ),
}
with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:
start_task = DummyOperator( task_id= "start" )
stop_task = DummyOperator( task_id= "stop" )
sensor_task = FileSensor( task_id= "my_file_sensor_task", poke_interval= 30, fs_conn_id= <path>, filepath= <file or directory name> )
start_task >> sensor_task >> stop_task
Solution 2
A simple example of a FileSensor
task:
second_task = FileSensor(
task_id="file_sensor_task_id",
filepath="{{ task_instance.xcom_pull(task_ids='get_filepath_task') }}",
#fs_conn_id="fs_default" # default one, commented because not needed
poke_interval= 20,
dag=dag
)
Here I'm passing as filepath
the returned value of the previous PythonOperator
task_id
(named get_filepath_task
) using xcom_pull
.
But it can be a whatever string of a filepath or directory that you are checking the existence.
The fs_conn_id
parameter is the string name of a connection you have available in the UI Admin/Connections section.
The default value of fs_conn_id
is "fs_default"
(you can see it in the code of the FileSensor
class operator). Check the UI Admin/Connections and you will find it.
You can skip to pass fs_conn_id
and just pass the parameter filepath
if you want to check if a file or a directory exists locally.
The poke_interval
is inherited from BaseSensorOperator
and it indicates the time in seconds that the job should wait in between each tries. The default value is 60 seconds.
DevEx
Updated on June 23, 2022Comments
-
DevEx almost 2 years
Whoever can please point me to an example of how to use Airflow FileSensor? I've googled and haven't found anything yet. Any example would be sufficient. My use case is quite simple:
Wait for a scheduled DAG to drop a file in a path, FileSensor task picks it up, read content and process it.
-
user3791111 over 4 yearsWhat is "fs_conn_id" and what do I need to substitute here?
-
codeBarer almost 4 yearsWhat if the file exists from past job. How can you distinguish from job that wrote the file yesterday vs. today
-
Meghdeep Ray almost 4 yearsThat is why we have the version number in the DAG name. Change "file_sensor_test_v1" to "file_sensor_test_v2" and so on as you create new versions.
-
Stefan Papp about 2 yearsfs_conn_id is a connection that you normally define in the UI for the path. Not knowing this can create some confusion. So in the fs_conn_id you define a connection, which might also contain a path.