celery - Tasks that need to run in priority

25,736

Solution 1

Celery does not support task priority. (v3.0)

http://docs.celeryproject.org/en/master/faq.html#does-celery-support-task-priorities

You may solve this problem by routing tasks.

http://docs.celeryproject.org/en/latest/userguide/routing.html

Prepare default and priority_high Queue.

from kombu import Queue
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default'),
    Queue('priority_high'),
)

Run two daemon.

user@x:/$ celery worker -Q priority_high
user@y:/$ celery worker -Q default,priority_high

And route task.

your_task.apply_async(args=['...'], queue='priority_high')

Solution 2

If you use RabbitMQ transport then configure your queues the following way: settings.py

from kombu import Queue
...
CELERY_TASK_QUEUES = (
    Queue('default', routing_key='task_default.#', max_priority=10), 
    ...)

Then run your tasks:

my_low_prio_task.apply_async(args=(...), priority=1)
my_high_prio_task.apply_async(args=(...), priority=10)

Presently this code works for kombu==4.6.11, celery==4.4.6.

Share:
25,736

Related videos on Youtube

fabriciols
Author by

fabriciols

Criador e desenvolvedor do http://mypst.com.br. Site criado com python/django.

Updated on July 23, 2022

Comments

  • fabriciols
    fabriciols almost 2 years

    In my website users can UPDATE they profile (manual) every time he want, or automatic once a day.

    This task is being distributed with celery now.

    But i have a "problem" :

    Every day, in automatic update, a job put ALL users (+-6k users) on queue:

    from celery import group
    from tasks import *
    import datetime
    from lastActivityDate.models import UserActivity
    
    today   = datetime.datetime.today()
    one_day = datetime.timedelta(days=5)
    today -= one_day
    
    print datetime.datetime.today()
    
    user_list = UserActivity.objects.filter(last_activity_date__gte=today)
    g = group(update_user_profile.s(i.user.auth.username) for i in user_list)
    
    print datetime.datetime.today()
    print g(user_list.count()).get()
    

    If someone try to do the manual update, they will enter on te queue and last forever to be executed.

    Is there a way to set this manual task to run in a piority way? Or make a dedicated for each separated queue: manual and automatic?

    • catherine
      catherine about 11 years
      you can limit the number of tasks executed, for example in hour. Then another tasks... Read their documentation...
  • mafrosis
    mafrosis over 9 years
    For anyone coming to this answer late (like me); it's vital to note that the two celery workers are running on different hosts - ie, two servers are consuming from the priority_high queue, and one server is consuming default
  • bad_keypoints
    bad_keypoints about 9 years
    @Satoshi Yoshinaga can I also achieve queue specific concurrency by specificying "-c N" param in the daemon running commands? I need one of my queues to have only some workers, and other other a lot (heavy tasks queue).
  • Chemary
    Chemary over 7 years
    Now you can also use message priorities
  • Eligio Mariño
    Eligio Mariño over 7 years
  • Pitipong Guntawong
    Pitipong Guntawong over 5 years
    to anyone who come across this answer. the Queue is from kombu package not queue package
  • Kevin Postlewaite
    Kevin Postlewaite about 5 years
    To those coming late to this answer, Celery now supports task priority to varying extents with RabbitMQ and Redis
  • Alan Evangelista
    Alan Evangelista over 4 years
    Why makes the high-priority queue high priority in the task routing setup example above? The fact that it is served by more workers than the default queue? The reason I'm asking is that I only have one host to run one or more celery workers, I want to prioritize some quick tasks and I'm not sure how to do it.
  • Pablo Guerrero
    Pablo Guerrero over 3 years
    @AlanEvangelista, in the above example, "priority_high" queue is so because tasks there can be executed in both workers whereas tasks in "default" can be only executed in the second worker. Thus, if the second worker is busy with default tasks that where queued before, still, the first worker can execute high priority tasks.
  • Utkarsh Dalal
    Utkarsh Dalal over 2 years
    Right answer. For anyone else wondering, 10 is the highest priority and 1 is the lowest.
  • Tim Tisdall
    Tim Tisdall over 2 years
    The docs say the priority values are 0 to 9 with 0 as the highest (and the default). And that the actual usable values are defined in the priority_steps setting which has a default of [0, 3, 6, 9]. But it seems max_priority changes that so in this example the priorities are 0 to 10, but that only works with RabbitMQ. It's a bit confusing.