Django & Celery — Routing problems
You are using the Django ORM as a broker, which means declarations are only stored in memory (see the, inarguably hard to find, transport comparison table at http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison)
So when you apply this task with routing_key important_task.update
it will not be able
to route it, because it hasn't declared the queue yet.
It will work if you do this:
@task(queue="i_tasks", routing_key="important_tasks.update")
def important_task():
print("IMPORTANT")
But it would be much simpler for you to use the automatic routing feature, since there's nothing here that shows you need to use a 'topic' exchange, to use automatic routing simply remove the settings:
-
CELERY_DEFAULT_QUEUE
, -
CELERY_DEFAULT_EXCHANGE
, CELERY_DEFAULT_EXCHANGE_TYPE
CELERY_DEFAULT_ROUTING_KEY
CELERY_QUEUES
And declare your task like this:
@task(queue="important")
def important_task():
return "IMPORTANT"
and then to start a worker consuming from that queue:
$ python manage.py celeryd -l info -Q important
or to consume from both the default (celery
) queue and the important
queue:
$ python manage.py celeryd -l info -Q celery,important
Another good practice is to not hardcode the queue names into the
task and use CELERY_ROUTES
instead:
@task
def important_task():
return "DEFAULT"
then in your settings:
CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}}
If you still insist on using topic exchanges then you could add this router to automatically declare all queues the first time a task is sent:
class PredeclareRouter(object):
setup = False
def route_for_task(self, *args, **kwargs):
if self.setup:
return
self.setup = True
from celery import current_app, VERSION as celery_version
# will not connect anywhere when using the Django transport
# because declarations happen in memory.
with current_app.broker_connection() as conn:
queues = current_app.amqp.queues
channel = conn.default_channel
if celery_version >= (2, 6):
for queue in queues.itervalues():
queue(channel).declare()
else:
from kombu.common import entry_to_queue
for name, opts in queues.iteritems():
entry_to_queue(name, **opts)(channel).declare()
CELERY_ROUTES = (PredeclareRouter(), )
Michael Waterfall
Updated on July 21, 2022Comments
-
Michael Waterfall almost 2 years
I'm using Django and Celery and I'm trying to setup routing to multiple queues. When I specify a task's
routing_key
andexchange
(either in the task decorator or usingapply_async()
), the task isn't added to the broker (which is Kombu connecting to my MySQL database).If I specify the queue name in the task decorator (which will mean the routing key is ignored), the task works fine. It appears to be a problem with the routing/exchange setup.
Any idea what the problem could be?
Here's the setup:
settings.py
INSTALLED_APPS = ( ... 'kombu.transport.django', 'djcelery', ) BROKER_BACKEND = 'django' CELERY_DEFAULT_QUEUE = 'default' CELERY_DEFAULT_EXCHANGE = "tasks" CELERY_DEFAULT_EXCHANGE_TYPE = "topic" CELERY_DEFAULT_ROUTING_KEY = "task.default" CELERY_QUEUES = { 'default': { 'binding_key':'task.#', }, 'i_tasks': { 'binding_key':'important_task.#', }, }
tasks.py
from celery.task import task @task(routing_key='important_task.update') def my_important_task(): try: ... except Exception as exc: my_important_task.retry(exc=exc)
Initiate task:
from tasks import my_important_task my_important_task.delay()