How to route tasks to different queues with Celery and Django
Solution 1
I have got it working, there are few things to note here:
According Celery's 4.2.0 documentation, CELERY_ROUTES
should be the variable to define queue routing, but it only works for me using CELERY_TASK_ROUTES
instead. The task routing seems to be independent from Celery Beat, therefore this will only work for tasks scheduled manually:
app1_test.delay()
app2_test.delay()
or
app1_test.apply_async()
app2_test.apply_async()
To make it work with Celery Beat, we just need to define the queues explicitly in the CELERY_BEAT_SCHEDULE
variable. The final setup of the file my_app/settings.py
would be as follows:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
'app1.tasks.*': {'queue': 'queue1'},
'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
'app1_test': {
'task': 'app1.tasks.app1_test',
'schedule': 15,
'options': {'queue': 'queue1'}
},
'app2_test': {
'task': 'app2.tasks.app2_test',
'schedule': 15,
'options': {'queue': 'queue2'}
},
}
And to run Celery listening on those two queues:
celery -A my_app worker -B -l INFO -Q queue1,queue2
Where
-
-A
: name of the project or app. -
-B
: Initiates the task scheduler Celery beat. -
-l
: Defines the logging level. -
-Q
: Defines the queues handled by this worker.
I hope this saves some time to other developers.
Solution 2
adding queue
parameter to the decorator may help you,
@app.task(queue='queue1')
def app1_test():
print('I am app1_test task!')
time.sleep(10)
Related videos on Youtube
Ander
Emerged from the depths of the Basque Country, hardened under the intensity of the Spanish sun and matured in the tech-paradise of Ireland. I am a passionate Python developer with more than 13 years commercial experience. I have worked in high-scale projects, creating quality software for many different brands.
Updated on November 02, 2021Comments
-
Ander over 2 years
I am using the following stack:
- Python 3.6
- Celery v4.2.1 (Broker: RabbitMQ v3.6.0)
- Django v2.0.4.
According Celery's documentation, running scheduled tasks on different queues should be as easy as defining the corresponding queues for the tasks on CELERY_ROUTES, nonetheless all tasks seem to be executed on Celery's default queue.
This is the configuration on my_app/settings.py:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//" CELERY_ROUTES = { 'app1.tasks.*': {'queue': 'queue1'}, 'app2.tasks.*': {'queue': 'queue2'}, } CELERY_BEAT_SCHEDULE = { 'app1_test': { 'task': 'app1.tasks.app1_test', 'schedule': 15, }, 'app2_test': { 'task': 'app2.tasks.app2_test', 'schedule': 15, }, }
The tasks are just simple scripts for testing routing:
File app1/tasks.py:
from my_app.celery import app import time @app.task() def app1_test(): print('I am app1_test task!') time.sleep(10)
File app2/tasks.py:
from my_app.celery import app import time @app.task() def app2_test(): print('I am app2_test task!') time.sleep(10)
When I run Celery with all the required queues:
celery -A my_app worker -B -l info -Q celery,queue1,queue2
RabbitMQ will show that only the default queue "celery" is running the tasks:
sudo rabbitmqctl list_queues # Tasks executed by each queue: # - celery 2 # - queue1 0 # - queue2 0
Does somebody know how to fix this unexpected behavior?
Regards,
-
Ander almost 6 yearsThanks @JPG, that would be a valid alternative, nonetheless I prefer to define the queues on the Django Setting files to get more flexibility. This way I can use different queue names depending on the environment: Testing, Staging, Production
-
Researcher over 5 yearsI gave up with the configuration options, "celery_task_default_queue", "task_default_queue", "task_routes". None of these did anything. Only thing that worked was the decorator, thanks JPG.
-
sparrowt over 5 yearsExplanation of
CELERY_ROUTES
vsCELERY_TASK_ROUTES
confusion:CELERY_ROUTES
is the old celery setting name which has now been replaced bytask_routes
. However celery settings in a django settings file must be upper-case (e.g.TASK_ROUTES
). To avoid conflict with other django settings it's recommended to prefix celery settings withCELERY_
resulting inCELERY_TASK_ROUTES
. This is loaded by doing something like:app.config_from_object('django.conf:settings', namespace='CELERY')
. SoCELERY_TASK_ROUTES
is just an upper-cased-and-prefixed alteration of the new setting name. -
Reza almost 4 yearsfor those who wondering about the celery beat, it is not independent of the task routing and it should work as well.
-
Chetan Vashisth over 3 yearsNo I dont think we need to define that 'options' param in the settings file if you'll just remove the celery before the other queue names from the command so it will work just fine because you have already defined the CELERY_ROUTES in the settings.py file
-
Ham over 3 yearsThis also works well in celery v5.0 , thanks
-
TheZeke about 3 yearsOne should always specify the queue name and name of the worker. Eg. "celery -A my_app worker -Q my_queue,my_other_queue -P threads --task-events -c 40 -l INFO -B --scheduler django_celery_beat.schedulers:DatabaseScheduler -n celery@%h""
-
pyofey over 2 yearsLove the answer! Works just fine for me in v4.4.0 in Jan 2022 and @sparrowt comment and explanation to use CELERY_TASK_ROUTES instead of CELERY_ROUTES was :chefskiss: :D