Airflow S3KeySensor - How to make it continue running

22,028

Solution 1

Within Airflow, there isn't a concept that maps to an always running DAG. You could have a DAG run very frequently like every 1 to 5 minutes if that suits your use case.

The main thing here is that the S3KeySensor checks until it detects that the first file exists in the key's wildcard path (or timeout), then it runs. But when a second, or third, or fourth file lands, the S3 sensor will have already completed running for that DAG run. It won't get scheduled to run again until the next DAG run. (The looping idea you described is roughly equivalent to what the scheduler does when it creates DAG runs except not forever.)

An external trigger definitely sounds like the best approach for your use case, whether that trigger comes via the Airflow CLI's trigger_dag command ($ airflow trigger_dag ...):

https://github.com/apache/incubator-airflow/blob/972086aeba4616843005b25210ba3b2596963d57/airflow/bin/cli.py#L206-L222

Or via the REST API:

https://github.com/apache/incubator-airflow/blob/5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5/airflow/www/api/experimental/endpoints.py#L41-L89

Both turn around and call the trigger_dag function in the common (experimental) API:

https://github.com/apache/incubator-airflow/blob/089c996fbd9ecb0014dbefedff232e8699ce6283/airflow/api/common/experimental/trigger_dag.py#L28-L67

You could, for instance, setup an AWS Lambda function, called when a file lands on S3, that runs the trigger DAG call.

Solution 2

Another way is to use the S3 trigger an aws lambda which will invoke the DAG using api

s3 event -> aws lambda -> Airflow api

Setup S3 notification to trigger lambda

https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html

Airflow API

https://airflow.apache.org/docs/apache-airflow/stable/rest-api-ref.html

Share:
22,028

Related videos on Youtube

Kyle Bridenstine
Author by

Kyle Bridenstine

I'm a Software Engineer with a focus in AWS. I program in Go, Java, Python, and I use Linux and SQL a lot. I also was using Apache Airflow for a while.

Updated on December 31, 2020

Comments

  • Kyle Bridenstine
    Kyle Bridenstine over 3 years

    With the help of this Stackoverflow post I just made a program (the one shown in the post) where when a file is placed inside an S3 bucket a task in one of my running DAGs is triggered and then I perform some work using the BashOperator. Once it's done though the DAG is no longer in a running state but instead goes into a success state and if I want to have it pick up another file I need to clear all the 'Past', 'Future', 'Upstream', 'Downstream' activity. I would like to make this program so that it's always running and anytime a new file is placed inside the S3 bucket the program kicks off the tasks.

    Can I continue using the S3KeySenor to do this or do I need to figure out a way of setting up an External Trigger to run my DAG? As of now my S3KeySensor is pretty pointless if it's only going to ever run once.

    from airflow import DAG
    from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
    from datetime import datetime, timedelta
    from airflow.operators.bash_operator import BashOperator
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2018, 5, 29),
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 5,
        'retry_delay': timedelta(minutes=5)
    }
    
    dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')
    
    # This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
    t2 = BashOperator(
        task_id='create_emr_cluster_1',
        bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
        retries=1,
        dag=dag)
    
    t1 = BashOperator(
        task_id='success_log',
        bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
        dag=dag)
    
    sensor = S3KeySensor(
        task_id='new_s3_file_in_foobar-bucket',
        bucket_key='*',
        wildcard_match=True,
        bucket_name='foobar-bucket',
        s3_conn_id='s3://foobar-bucket',
        timeout=18*60*60,
        poke_interval=120,
        dag=dag)
    
    t1.set_upstream(sensor)
    t2.set_upstream(t1)
    

    I'm wondering if this is not possible because it then wouldn't be a Directed Acyclic Graph but rather it would have a loop that repeated sensor -> t1 -> t2 -> sensor -> t1 -> t2 -> sensor -> ... keep repeating.

    Update:

    My use case is pretty simple, anytime a new file is placed inside a designated AWS S3 Bucket I want my DAG to be triggered and start my process of various tasks. The tasks will do things like instantiate a new AWS EMR Cluster, extract the files from the AWS S3 Bucket, perform some AWS EMR Activities, then shut down the AWS EMR Cluster. From there the DAG would go back into a waiting state where it would wait for new files to arrive in the AWS S3 Bucket and then repeat the process indefinitely.

    • Taylor D. Edmiston
      Taylor D. Edmiston almost 6 years
      I posted an answer below that makes some assumptions around the use case. Let me know if anything is unclear or if I've misunderstood what you'd like to achieve.
    • Kyle Bridenstine
      Kyle Bridenstine almost 6 years
      @TaylorEdmiston I think you understood pretty well what I was trying to achieve but I also updated the post to include my use case. Thanks.
    • Taylor D. Edmiston
      Taylor D. Edmiston almost 6 years
      This makes sense. I think the very frequently running DAG might be simpler to get started, but the externally triggered DAG runs sounds like a nicer, more flexible setup for this use case.
    • Darshan
      Darshan over 2 years
      For those who are wondering why it's not working for them: Imports are outdated/deprecated. Try this 1. pip install apache-airflow-providers-amazon 2. from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor
  • Kyle Bridenstine
    Kyle Bridenstine almost 6 years
    Okay thanks. That's sort of what I expected the answer would be. I could easily setup a Lambda function to make the REST API call like you said; this is also what I would have to do if I went with AWS Data Pipeline, I'd have to activate it each time using a Lambda function.
  • Dev
    Dev over 3 years
    is this approach still the same in 2020 or is there any better way to deal with this now?
  • Taylor D. Edmiston
    Taylor D. Edmiston over 3 years
    @Devender yes, the approach is still the same currently. i don't believe there are any plans to fundamentally change it.
  • ajendrex
    ajendrex over 3 years
    @TaylorEdmiston I have a DAG schedule to run every minute, and the first task is an S3KeySensor which timeouts in 59 seconds. I would expect that a new instance of the DAG, so a new instance of the sensor task, would run every minute, but it's running only once... should this work? what could I being doing wrong?
  • Taylor D. Edmiston
    Taylor D. Edmiston over 3 years
    @ajendrex Hi. Yes, the approach you described should work when running the DAG normally. If you are calling via trigger_dag, that only creates one DAG run irrespective of the DAG's schedule.
  • sɐunıɔןɐqɐp
    sɐunıɔןɐqɐp over 3 years
    From Review: A link to a solution is welcome, but please ensure your answer is useful without it: add context around the link so your fellow users will have some idea what it is and why it’s there, then quote the most relevant part of the page you're linking to in case the target page is unavailable. Answers that are little more than a link may be deleted. See: How do I write a good answer?