How to route tasks to different queues with Celery and Django

14,163

Solution 1

I have got it working, there are few things to note here:

According Celery's 4.2.0 documentation, CELERY_ROUTES should be the variable to define queue routing, but it only works for me using CELERY_TASK_ROUTES instead. The task routing seems to be independent from Celery Beat, therefore this will only work for tasks scheduled manually:

app1_test.delay()
app2_test.delay()

or

app1_test.apply_async()
app2_test.apply_async()

To make it work with Celery Beat, we just need to define the queues explicitly in the CELERY_BEAT_SCHEDULE variable. The final setup of the file my_app/settings.py would be as follows:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
        'options': {'queue': 'queue1'}
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
        'options': {'queue': 'queue2'}
    },

}

And to run Celery listening on those two queues:

celery -A my_app worker -B -l INFO -Q queue1,queue2

Where

  • -A: name of the project or app.
  • -B: Initiates the task scheduler Celery beat.
  • -l: Defines the logging level.
  • -Q: Defines the queues handled by this worker.

I hope this saves some time to other developers.

Solution 2

adding queue parameter to the decorator may help you,

@app.task(queue='queue1')
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)
Share:
14,163

Related videos on Youtube

Ander
Author by

Ander

Emerged from the depths of the Basque Country, hardened under the intensity of the Spanish sun and matured in the tech-paradise of Ireland. I am a passionate Python developer with more than 13 years commercial experience. I have worked in high-scale projects, creating quality software for many different brands.

Updated on November 02, 2021

Comments

  • Ander
    Ander over 2 years

    I am using the following stack:

    • Python 3.6
    • Celery v4.2.1 (Broker: RabbitMQ v3.6.0)
    • Django v2.0.4.

    According Celery's documentation, running scheduled tasks on different queues should be as easy as defining the corresponding queues for the tasks on CELERY_ROUTES, nonetheless all tasks seem to be executed on Celery's default queue.

    This is the configuration on my_app/settings.py:

    CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
    CELERY_ROUTES = {
     'app1.tasks.*': {'queue': 'queue1'},
     'app2.tasks.*': {'queue': 'queue2'},
    }
    CELERY_BEAT_SCHEDULE = {
        'app1_test': {
            'task': 'app1.tasks.app1_test',
            'schedule': 15,
        },
        'app2_test': {
            'task': 'app2.tasks.app2_test',
            'schedule': 15,
        },
    
    }
    

    The tasks are just simple scripts for testing routing:

    File app1/tasks.py:

    from my_app.celery import app
    import time
    
    
    @app.task()
    def app1_test():
        print('I am app1_test task!')
        time.sleep(10)
    

    File app2/tasks.py:

    from my_app.celery import app
    import time
    
    
    @app.task()
    def app2_test():
        print('I am app2_test task!')
        time.sleep(10)
    

    When I run Celery with all the required queues:

    celery -A my_app worker -B -l info -Q celery,queue1,queue2
    

    RabbitMQ will show that only the default queue "celery" is running the tasks:

    sudo rabbitmqctl list_queues
    # Tasks executed by each queue:
    #  - celery 2
    #  - queue1 0
    #  - queue2 0
    

    Does somebody know how to fix this unexpected behavior?

    Regards,

  • Ander
    Ander almost 6 years
    Thanks @JPG, that would be a valid alternative, nonetheless I prefer to define the queues on the Django Setting files to get more flexibility. This way I can use different queue names depending on the environment: Testing, Staging, Production
  • Researcher
    Researcher over 5 years
    I gave up with the configuration options, "celery_task_default_queue", "task_default_queue", "task_routes". None of these did anything. Only thing that worked was the decorator, thanks JPG.
  • sparrowt
    sparrowt over 5 years
    Explanation of CELERY_ROUTES vs CELERY_TASK_ROUTES confusion: CELERY_ROUTES is the old celery setting name which has now been replaced by task_routes. However celery settings in a django settings file must be upper-case (e.g. TASK_ROUTES). To avoid conflict with other django settings it's recommended to prefix celery settings with CELERY_ resulting in CELERY_TASK_ROUTES. This is loaded by doing something like: app.config_from_object('django.conf:settings', namespace='CELERY'). So CELERY_TASK_ROUTES is just an upper-cased-and-prefixed alteration of the new setting name.
  • Reza
    Reza almost 4 years
    for those who wondering about the celery beat, it is not independent of the task routing and it should work as well.
  • Chetan Vashisth
    Chetan Vashisth over 3 years
    No I dont think we need to define that 'options' param in the settings file if you'll just remove the celery before the other queue names from the command so it will work just fine because you have already defined the CELERY_ROUTES in the settings.py file
  • Ham
    Ham over 3 years
    This also works well in celery v5.0 , thanks
  • TheZeke
    TheZeke about 3 years
    One should always specify the queue name and name of the worker. Eg. "celery -A my_app worker -Q my_queue,my_other_queue -P threads --task-events -c 40 -l INFO -B --scheduler django_celery_beat.schedulers:DatabaseScheduler -n celery@%h""
  • pyofey
    pyofey over 2 years
    Love the answer! Works just fine for me in v4.4.0 in Jan 2022 and @sparrowt comment and explanation to use CELERY_TASK_ROUTES instead of CELERY_ROUTES was :chefskiss: :D