Setting up periodic tasks in Celery (celerybeat) dynamically using add_periodic_task

19,704

EDIT: (13/01/2018)

The latest release 4.1.0 have addressed the subject in this ticket #3958 and has been merged


Actually you can't not define periodic task at the view level, because the beat schedule setting will be loaded first and can not be rescheduled at runtime:

The add_periodic_task() function will add the entry to the beat_schedule setting behind the scenes, and the same setting can also can be used to set up periodic tasks manually:

app.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    },
}

which means if you want to use add_periodic_task() it should be wrapped within an on_after_configure handler at the celery app level and any modification on runtime will not take effect:

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10, my_task.s(66))

As mentioned in the doc the the regular celerybeat simply keep track of task execution:

The default scheduler is the celery.beat.PersistentScheduler, that simply keeps track of the last run times in a local shelve database file.

In order to be able to dynamically manage periodic tasks and reschedule celerybeat at runtime:

There’s also the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.

The tasks will be persisted in django database and the scheduler could be updated in task model at the db level. Whenever you update a periodic task a counter in this tasks table will be incremented, and tells the celery beat service to reload the schedule from the database.

A possible solution for you could be as follow:

from django_celery_beat.models import PeriodicTask, IntervalSchedule

schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))

views.py

def update_task_view(request, id)
    task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
    task.args=json.dumps([id])
    task.save()
Share:
19,704
Marc
Author by

Marc

Updated on June 04, 2022

Comments

  • Marc
    Marc about 2 years

    I'm using Celery 4.0.1 with Django 1.10 and I have troubles scheduling tasks (running a task works fine). Here is the celery configuration:

    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
    app = Celery('myapp')
    
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
    app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
    app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
    app.conf.CELERY_TASK_SERIALIZER = 'json'
    app.conf.CELERY_ACCEPT_CONTENT = ['json']
    app.conf.CELERY_IGNORE_RESULT = True
    app.conf.CELERY_DISABLE_RATE_LIMITS = True
    app.conf.BROKER_POOL_LIMIT = 2
    
    app.conf.CELERY_QUEUES = (
        Queue('myapp.celery_default'),
        Queue('myapp.queue1'),
        Queue('myapp.queue2'),
        Queue('myapp.queue3'),
    )
    

    Then in tasks.py I have:

    @app.task(queue='myapp.queue1')
    def my_task(some_id):
        print("Doing something with", some_id)
    

    In views.py I want to schedule this task:

    def my_view(request, id):
        app.add_periodic_task(10, my_task.s(id))
    

    Then I execute the commands:

    sudo systemctl start rabbitmq.service
    celery -A myapp.celery_app beat -l debug
    celery worker -A myapp.celery_app
    

    But the task is never scheduled. I don't see anything in the logs. The task is working because if in my view I do:

    def my_view(request, id):
        my_task.delay(id)
    

    The task is executed.

    If in my configuration file if I schedule the task manually, like this it works:

    app.conf.CELERYBEAT_SCHEDULE = {
        'add-every-30-seconds': {
            'task': 'tasks.my_task',
            'schedule': 10.0,
            'args': (66,)
        },
    }
    

    I just can't schedule the task dynamically. Any idea?

  • Greg Brown
    Greg Brown almost 7 years
    Picking up on your comment "Actually you can't not define periodic task at the view level": Is it possible to use add_periodic_task() at the app-level, i.e. intask.py? It seems better encapsulation to keep these periodic tasks declared within the app.
  • Dhia
    Dhia almost 7 years
    Actually is not at all necessary to use it, as it will be called for you if you just use app.conf.CELERYBEAT_SCHEDULE settings syntax, but if you want use it explicitly you can use it in the task.py file.
  • Sreenadh T C
    Sreenadh T C over 6 years
    I believe the latest release(after 4.1.0) should have this one addressed. Here is the dev that is going on #3958
  • GDorn
    GDorn about 4 years
    Note that on_after_configure will not work for periodic tasks defined in a tasks.py file in another app. Use on_after_finalize instead. This is because the on_after_configure signal is sent once the celery.py file is imported, but the autodiscover_tasks call is not resolved until much later, when django is done setting up all of the apps. (autodiscover_tasks schedules that call but does not execute it immediately unless you use force=True which is guaranteed to fail with Django).