How to dynamically add / remove periodic tasks to Celery (celerybeat)
Solution 1
No, I'm sorry, this is not possible with the regular celerybeat.
But it's easily extensible to do what you want, e.g. the django-celery scheduler is just a subclass reading and writing the schedule to the database (with some optimizations on top).
Also you can use the django-celery scheduler even for non-Django projects.
Something like this:
-
Install django + django-celery:
$ pip install -U django django-celery
-
Add the following settings to your celeryconfig:
DATABASES = { 'default': { 'NAME': 'celerybeat.db', 'ENGINE': 'django.db.backends.sqlite3', }, } INSTALLED_APPS = ('djcelery', )
-
Create the database tables:
$ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
-
Start celerybeat with the database scheduler:
$ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \ -S djcelery.schedulers.DatabaseScheduler
Also there's the djcelerymon
command which can be used for non-Django projects
to start celerycam and a Django Admin webserver in the same process, you can
use that to also edit your periodic tasks in a nice web interface:
$ djcelerymon
(Note for some reason djcelerymon can't be stopped using Ctrl+C, you have to use Ctrl+Z + kill %1)
Solution 2
This question was answered on google groups.
I AM NOT THE AUTHOR, all credit goes to Jean Mark
Here's a proper solution for this. Confirmed working, In my scenario, I sub-classed Periodic Task and created a model out of it since I can add other fields to the model as I need and also so I could add the "terminate" method. You have to set the periodic task's enabled property to False and save it before you delete it. The whole subclassing is not a must, the schedule_every method is the one that really does the work. When you're ready to terminate you task (if you didn't subclass it) you can just use PeriodicTask.objects.filter(name=...) to search for your task, disable it, then delete it.
Hope this helps!
from djcelery.models import PeriodicTask, IntervalSchedule from datetime import datetime class TaskScheduler(models.Model): periodic_task = models.ForeignKey(PeriodicTask) @staticmethod def schedule_every(task_name, period, every, args=None, kwargs=None): """ schedules a task by name every "every" "period". So an example call would be: TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. """ permissible_periods = ['days', 'hours', 'minutes', 'seconds'] if period not in permissible_periods: raise Exception('Invalid period specified') # create the periodic task and the interval ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) if interval_schedules: # just check if interval schedules exist like that already and reuse em interval_schedule = interval_schedules[0] else: # create a brand new interval schedule interval_schedule = IntervalSchedule() interval_schedule.every = every # should check to make sure this is a positive int interval_schedule.period = period interval_schedule.save() ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) if args: ptask.args = args if kwargs: ptask.kwargs = kwargs ptask.save() return TaskScheduler.objects.create(periodic_task=ptask) def stop(self): """pauses the task""" ptask = self.periodic_task ptask.enabled = False ptask.save() def start(self): """starts the task""" ptask = self.periodic_task ptask.enabled = True ptask.save() def terminate(self): self.stop() ptask = self.periodic_task self.delete() ptask.delete()
Solution 3
This was finally made possible by a fix included in celery v4.1.0. Now, you just need to change the schedule entries in the database backend, and celery-beat will act according to the new schedule.
The docs vaguely describe how this works. The default scheduler for celery-beat, PersistentScheduler
, uses a shelve file as its schedule database. Any changes to the beat_schedule
dictionary in the PersistentScheduler
instance are synced with this database (by default, every 3 minutes), and vice-versa. The docs describe how to add new entries to the beat_schedule
using app.add_periodic_task
. To modify an existing entry, just add a new entry with the same name
. Delete an entry as you would from a dictionary: del app.conf.beat_schedule['name']
.
Suppose you want to monitor and modify your celery beat schedule using an external app. Then you have several options:
- You can
open
the shelve database file and read its contents like a dictionary. Write back to this file for modifications. - You can run another instance of the Celery app, and use that one to modify the shelve file as described above.
- You can use the custom scheduler class from django-celery-beat to store the schedule in a django-managed database, and access the entries there.
- You can use the scheduler from celerybeat-mongo to store the schedule in a MongoDB backend, and access the entries there.
Solution 4
There is a library called django-celery-beat which provides the models one needs. To make it dynamically load new periodic tasks one has to create its own Scheduler.
from django_celery_beat.schedulers import DatabaseScheduler
class AutoUpdateScheduler(DatabaseScheduler):
def tick(self, *args, **kwargs):
if self.schedule_changed():
print('resetting heap')
self.sync()
self._heap = None
new_schedule = self.all_as_schedule()
if new_schedule:
to_add = new_schedule.keys() - self.schedule.keys()
to_remove = self.schedule.keys() - new_schedule.keys()
for key in to_add:
self.schedule[key] = new_schedule[key]
for key in to_remove:
del self.schedule[key]
super(AutoUpdateScheduler, self).tick(*args, **kwargs)
@property
def schedule(self):
if not self._initial_read and not self._schedule:
self._initial_read = True
self._schedule = self.all_as_schedule()
return self._schedule
Solution 5
You can check out this flask-djcelery which configures flask and djcelery and also provides browseable rest api
Jamie Forrest
Updated on July 21, 2022Comments
-
Jamie Forrest almost 2 years
If I have a function defined as follows:
def add(x,y): return x+y
Is there a way to dynamically add this function as a celery PeriodicTask and kick it off at runtime? I'd like to be able to do something like (pseudocode):
some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30")) celery.beat.start(some_unique_task_id)
I would also want to stop or remove that task dynamically with something like (pseudocode):
celery.beat.remove_task(some_unique_task_id)
or
celery.beat.stop(some_unique_task_id)
FYI I am not using djcelery, which lets you manage periodic tasks via the django admin.
-
Ansuman Bebarta over 10 yearsCan you please mention code to add task and remove? Sorry I am not getting.
-
Chris about 8 years@kai
IntervalSchedule
,PeriodicTask
, etc, aredjcelery
classes, and the OP says he's not usingdjcelery
. Definitely useful nonetheless. -
dev almost 8 yearsAny changes in this from 2012 to 2016?
-
freethebees almost 7 yearsThanks. Didn't work straight away but using
to_add = [key for key in new_schedule.keys() if key not in self.schedule.keys()]
and similar forto_remove
did the trick. Why isn't this a standard option? Until now, I've had to have Celery tasks calls other Celery tasks with a countdown. That doesn't sound very good to me. -
Mihael Waschl almost 5 yearsGreat solution!!
-
stoneman_41 almost 5 yearsLate comment, but I don't understand how this can be done in a true dynamic fashion; i.e. after my application receives an API call, THEN make it configure the periodic task. From the code examples, it seems like it is always evaluated during function definition (with the decorator).
-
stoneman_41 almost 5 yearsFor example, when I try this:
_gdbm.error: [Errno 11] Resource temporarily unavailable
. So it seems like while celery is running I can't seem to open the file throughshelve.open(file)
. -
Zarathustra about 4 years@Tristan Brown good solution, do you have any non django specific example?
-
Niel Godfrey Ponciano over 2 yearsI added an answer for non-django applications. See stackoverflow.com/a/68858483/11043825
-
guesswho over 2 yearsThe shelve module does not support concurrent read/write access to shelved objects. Options 1 and 2 will not work.