Airflow Worker Configuration

13,184

Solution 1

Your configuration files look okay. As you suspected, all workers do indeed require a copy of the DAG folder. You can use something like git to keep them in sync and up to date.

Solution 2

Some of the biggest pain points with Airflow come up around deployment and keeping DAG files and plugins in sync across your Airflow scheduler, Airflow webserver, and Celery worker nodes.

We've created an open source project called Astronomer Open that automates a Dockerized Airflow, Celery, and PostgreSQL with some other goodies baked in. The project was motivated by seeing so many people hit the same pain points creating a very similar setup.

For example, here's the Airflow Dockerfile: https://github.com/astronomer/astronomer/blob/master/docker/airflow/1.10.2/Dockerfile

And the docs: https://open.astronomer.io/

Full disclosure: This is a project I contribute to at work — we offer a paid enterprise edition as well that runs on Kubernetes (docs). That said, the Open Edition is totally free to use.

Share:
13,184

Related videos on Youtube

Soundar Raj
Author by

Soundar Raj

Updated on June 04, 2022

Comments

  • Soundar Raj
    Soundar Raj almost 2 years

    I am a newbie to Airflow. I'm trying to setup Distributed Mode of Airflow Using Celery Executor by Refering this article https://stlong0521.github.io/20161023%20-%20Airflow.html

    Before getting into detail about the specification I would like to confirm that I've installed PostgreSQL on a seperate instance.

    The specification of the setup is detailed below:

    Airflow core/server computer

    • Python 3.5
      • airflow (AIRFLOW_HOME = ~/airflow)
      • celery
      • psycogp2
    • RabbitMQ

    Configurations made in airflow.cfg:

    sql_alchemy_conn = postgresql+psycopg2://username:[email protected]:5432/airflow
    executor = CeleryExecutor
    broker_url = amqp://username:[email protected]:5672//
    celery_result_backend = db+postgresql://username:[email protected]:5432/airflow
    

    Tests performed:

    RabbitMQ is running
    Can connect to PostgreSQL and have confirmed that Airflow has created tables
    Can start and view the webserver (including custom dags)
    

    Airflow worker computer

    Has the following installed:

    • Python 3.5 with
      • airflow (AIRFLOW_HOME = ~/airflow)
      • celery
    • psycogp2

    Configurations made in airflow.cfg are exactly the same as in the server:

    sql_alchemy_conn = postgresql+psycopg2://username:[email protected]:5432/airflow
    executor = CeleryExecutor
    broker_url = amqp://username:[email protected]:5672//
    celery_result_backend = db+postgresql://username:[email protected]:5432/airflow
    

    Output from commands run on the worker machine:

    When running airflow flower:

    [2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
    [2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
    [2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
    [I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
    [I 180219 14:58:15 command:144] Broker: amqp://username:[email protected]:5672//
    [I 180219 14:58:15 command:147] Registered tasks: 
        ['celery.accumulate',
         'celery.backend_cleanup',
         'celery.chain',
         'celery.chord',
         'celery.chord_unlock',
         'celery.chunks',
         'celery.group',
         'celery.map',
         'celery.starmap']
    [I 180219 14:58:15 mixins:224] Connected to amqp://username:[email protected]:5672//
    

    I am passing the dag in the Airflow Core machine and also I have copied the sample data(Excel sheets) which the dag will process to the same core machine.

    My worker log raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1

    Now my query is

    1) Should I copy the dag folder to the worker computer also

    2) Right now, I have not copied the dag folder on the worker computer and I'm not able to see the worker process pick up the task.

    Please point me where I am making a mistake and how to make the worker process pick up the process.

  • Soundar Raj
    Soundar Raj about 6 years
    Thank you so much for the reply. Ya I tried placing the DAG folder in the worker instance too.. Still it did not work. Finally I understood that I not only required to copy the DAG folder, I need to change the fernet_key of the worker as that of the scheduler for it to work.
  • Daniel Huang
    Daniel Huang about 6 years
    Yeah I would suggest keeping your config identical across scheduler/workers/webserver.
  • Kyle Bridenstine
    Kyle Bridenstine almost 6 years
    Can you please elaborate on what you did to resolve your issue? What do you mean you just changed the fernet_key of the worker?
  • deontologician
    deontologician almost 5 years
    this doesn't seem to be open source any more
  • Taylor D. Edmiston
    Taylor D. Edmiston almost 5 years
    @deontologician It looks like the location of the Dockerfiles has changed but they are still open source. github.com/astronomer/astronomer/tree/master/docker/airflow. You can also use the Astro CLI locally for an easy quickstart experience. astronomer.io/docs/cli-quickstart
  • akki
    akki over 4 years
    If one has taken the Docker path already, it might be easier to use Airflow's new DockerSwarmOperator to scale Airflow DAGs across multiple nodes.
  • Taylor D. Edmiston
    Taylor D. Edmiston over 4 years
    @akki Thank you for authoring this addition with #5489 — it looks pretty cool. Is the problem it solves (vs DockerOperator) running Docker containers representing Airflow task instances on a remote Docker daemon that exists outside of your Airflow worker nodes?
  • akki
    akki over 4 years
    @TaylorEdmiston I have not tested it with a remote Docker daemon myself but from this comment on the PR I understand it is possible. The operator itself gives you the ability to run Airflow tasks on a server which doesn't even run Airflow (but is a part of a Docker swarm cluster). Docker Swarm is Docker's native orchestration tool - basically you have a bunch of servers running Docker (and Airflow should be running on one of them), then this operator runs your task on any of those servers which has enough resources.
  • Taylor D. Edmiston
    Taylor D. Edmiston over 4 years
    @akki Thanks, this is a really cool idea to integrate Airflow with Docker Swarm. With the Swarm cluster being agnostic of Airflow, it sounds like it could have less overhead than running many DockerOperators inside the Airflow workers directly. I look forward to trying it!
  • akki
    akki over 4 years
    Thanks. If you have any constructive feedback/suggestions, I would love to hear. :)
  • Naresh Y
    Naresh Y about 4 years
    i've an aws ec2 instance up and running for airflow-docker. Now I would like to upgrade the same to airflow-docker-swarm for scalability. Can you guyz give some pointers around the same?