Connect new celery periodic task in django
Solution 1
For django we need to use another signal: @celery_app.on_after_finalize.connect
. It can be used for both:
- declaration of task schedule close to task in
app/tasks.py
because this signal will be fired after alltasks.py
imported and all possible receivers already subscribed (first case). - centralized schedule declaration because django apps will be already initialized and ready for imports (second case)
I think I should write down final declaration:
First case
Declaration of task schedule close to task:
main_app/some_app/tasks.py
from main_app.celery import app as celery_app
@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'))
@celery_app.task
def test(arg):
print(arg)
Second case
Centralized schedule declaration in config file main_app/celery.py
:
...
app = Celery()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
from main_app.some_app.tasks import test
sender.add_periodic_task(10.0, test.s('hello'))
...
Solution 2
If the intent is to maintain task logic separately in tasks.py, then calling from main_app.some_app.tasks import test
inside setup_periodic_tasks
did not work for me. What worked is the following:
celery.py
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
@app.task
def test(arg):
print(arg)
from some_app.tasks import test
test(arg)
tasks.py
@shared_task
def test(arg):
print('world')
This resulted in the following output:
[2017-10-26 22:52:42,262: INFO/MainProcess] celery@ubuntu-xenial ready.
[2017-10-26 22:52:42,263: INFO/MainProcess] Received task: main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4]
[2017-10-26 22:52:42,367: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:42,368: WARNING/ForkPoolWorker-2] world
[2017-10-26 22:52:42,369: INFO/ForkPoolWorker-2] Task main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4] succeeded in 0.002823335991706699s: None
[2017-10-26 22:52:51,205: INFO/Beat] Scheduler: Sending due task add every 10 (main_app.celery.test)
[2017-10-26 22:52:51,207: INFO/MainProcess] Received task: main_app.celery.test[ce0f3cfc-54d5-4d74-94eb-7ced2e5a6c4b]
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] world
Solution 3
If you want to use task logic seperately, use this setup:
celery.py:
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') # your settings.py path
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(5, periodic_task.s('sms'), name='SMS Process')
sender.add_periodic_task(60, periodic_task.s('email'), name='Email Process')
@app.task
def periodic_task(taskname):
from myapp.tasks import sms_process, email_process
if taskname == 'sms':
sms_process()
elif taskname == 'email':
email_process()
a sample task in a django app named myapp
:
myapp/tasks.py:
def sms_process():
print('send sms task')
def email_process():
print('send email task')
Related videos on Youtube
vvkuznetsov
Updated on June 16, 2022Comments
-
vvkuznetsov about 2 years
It's not a question but help to those who will find that the declaration of periodic tasks described in celery 4.0.1 documentation is hard to integrate in django: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries
copy paste celery config file
main_app/celery.py
:from celery import Celery from celery.schedules import crontab app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('world'), expires=10) # Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg)
Question
But what if we use django and our tasks are placed in another app? With celery
4.0.1
we no longer have@periodic_task
decorator. So let's see what we can do.First case
If you prefer to keep tasks and their schedule close to each other:
main_app/some_app/tasks.py
from main_app.celery import app as celery_app @celery_app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello')) @celery_app.task def test(arg): print(arg)
We can run
beat
in debug mode:celery -A main_app beat -l debug
and we will see that there's no such periodic task.
Second case
We can try to describe all periodic tasks in config file like this:
main_app/celery.py
... app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. from main_app.some_app.tasks import test sender.add_periodic_task(10.0, test.s('hello')) ...
The result is the same. But it will behave differently that you can see with manual debugging via pdb. In first example
setup_periodic_tasks
callback will not be fired at all. But in second example we'll getdjango.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
(this exception will not be print out)-
Louis over 7 yearsSO absolutely welcomes sharing information in the Question and Answer format. However, what you've got here is not a well written question. Please rewrite this so that it reads as an actual question written by someone facing an actual problem. You already know the solution but write the question from the perspective of someone who does not already know. (In the case at hand here, seems to me that you could produce a question from the p.o.v. of someone migration from 3.x to 4.x and finding that what used to work no longer works.)
-
Louis over 7 yearsAlso everything from the "Question" header down is a solution, and should be in a formal answer. (You can post significantly different solutions as different answers. People can vote on them independently then.)
-
-
andilabs over 6 yearsthis is still not working with
@shared_task
as people claim... github.com/celery/celery/issues/3797 -
vvkuznetsov about 6 years@HemanthSP, check docs.celeryproject.org/en/latest/userguide/… For this example you can use
celery -A main_app beat -l debug
to run scheduler and to run a workercelery -A main_app worker -l debug
-
Ron almost 6 yearsMy issue was using
app = Celery()
in other_app/tasks.py. Usingfrom main_app.celery import app as celery_app
solved the problem! -
w-- almost 6 yearsthis is the only thing that worked for me. thank you.