Connect new celery periodic task in django

13,986

Solution 1

For django we need to use another signal: @celery_app.on_after_finalize.connect. It can be used for both:

  • declaration of task schedule close to task in app/tasks.py because this signal will be fired after all tasks.py imported and all possible receivers already subscribed (first case).
  • centralized schedule declaration because django apps will be already initialized and ready for imports (second case)

I think I should write down final declaration:

First case

Declaration of task schedule close to task:

main_app/some_app/tasks.py

from main_app.celery import app as celery_app

@celery_app.on_after_finalize.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'))

@celery_app.task
def test(arg):
    print(arg)

Second case

Centralized schedule declaration in config file main_app/celery.py:

...

app = Celery()

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    from main_app.some_app.tasks import test
    sender.add_periodic_task(10.0, test.s('hello'))
...

Solution 2

If the intent is to maintain task logic separately in tasks.py, then calling from main_app.some_app.tasks import test inside setup_periodic_tasks did not work for me. What worked is the following:

celery.py

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

@app.task
def test(arg):
    print(arg)
    from some_app.tasks import test
    test(arg)

tasks.py

@shared_task
def test(arg):
    print('world')

This resulted in the following output:

[2017-10-26 22:52:42,262: INFO/MainProcess] celery@ubuntu-xenial ready.
[2017-10-26 22:52:42,263: INFO/MainProcess] Received task: main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4]  
[2017-10-26 22:52:42,367: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:42,368: WARNING/ForkPoolWorker-2] world
[2017-10-26 22:52:42,369: INFO/ForkPoolWorker-2] Task main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4] succeeded in 0.002823335991706699s: None
[2017-10-26 22:52:51,205: INFO/Beat] Scheduler: Sending due task add every 10 (main_app.celery.test)
[2017-10-26 22:52:51,207: INFO/MainProcess] Received task: main_app.celery.test[ce0f3cfc-54d5-4d74-94eb-7ced2e5a6c4b]  
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] world

Solution 3

If you want to use task logic seperately, use this setup:

celery.py:

import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') # your settings.py path

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(5, periodic_task.s('sms'), name='SMS Process')
    sender.add_periodic_task(60, periodic_task.s('email'), name='Email Process')


@app.task
def periodic_task(taskname):
    from myapp.tasks import sms_process, email_process

    if taskname == 'sms':
        sms_process()

    elif taskname == 'email':
        email_process()

a sample task in a django app named myapp:

myapp/tasks.py:

def sms_process():
    print('send sms task')

def email_process():
    print('send email task')
Share:
13,986

Related videos on Youtube

vvkuznetsov
Author by

vvkuznetsov

Updated on June 16, 2022

Comments

  • vvkuznetsov
    vvkuznetsov about 2 years

    It's not a question but help to those who will find that the declaration of periodic tasks described in celery 4.0.1 documentation is hard to integrate in django: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries

    copy paste celery config file main_app/celery.py:

    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery()
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    
        # Calls test('world') every 30 seconds
        sender.add_periodic_task(30.0, test.s('world'), expires=10)
    
        # Executes every Monday morning at 7:30 a.m.
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1),
            test.s('Happy Mondays!'),
        )
    
    @app.task
    def test(arg):
        print(arg)
    

    Question

    But what if we use django and our tasks are placed in another app? With celery 4.0.1 we no longer have @periodic_task decorator. So let's see what we can do.

    First case

    If you prefer to keep tasks and their schedule close to each other:

    main_app/some_app/tasks.py

    from main_app.celery import app as celery_app
    
    @celery_app.on_after_configure.connect
        def setup_periodic_tasks(sender, **kwargs):
            # Calls test('hello') every 10 seconds.
            sender.add_periodic_task(10.0, test.s('hello'))
    
    @celery_app.task
    def test(arg):
        print(arg)
    

    We can run beat in debug mode:

    celery -A main_app beat -l debug
    

    and we will see that there's no such periodic task.

    Second case

    We can try to describe all periodic tasks in config file like this:

    main_app/celery.py

    ...
    app = Celery()
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        from main_app.some_app.tasks import test
        sender.add_periodic_task(10.0, test.s('hello'))
    ...
    

    The result is the same. But it will behave differently that you can see with manual debugging via pdb. In first example setup_periodic_tasks callback will not be fired at all. But in second example we'll get django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet. (this exception will not be print out)

    • Louis
      Louis over 7 years
      SO absolutely welcomes sharing information in the Question and Answer format. However, what you've got here is not a well written question. Please rewrite this so that it reads as an actual question written by someone facing an actual problem. You already know the solution but write the question from the perspective of someone who does not already know. (In the case at hand here, seems to me that you could produce a question from the p.o.v. of someone migration from 3.x to 4.x and finding that what used to work no longer works.)
    • Louis
      Louis over 7 years
      Also everything from the "Question" header down is a solution, and should be in a formal answer. (You can post significantly different solutions as different answers. People can vote on them independently then.)
  • andilabs
    andilabs over 6 years
    this is still not working with @shared_task as people claim... github.com/celery/celery/issues/3797
  • vvkuznetsov
    vvkuznetsov about 6 years
    @HemanthSP, check docs.celeryproject.org/en/latest/userguide/… For this example you can use celery -A main_app beat -l debug to run scheduler and to run a worker celery -A main_app worker -l debug
  • Ron
    Ron almost 6 years
    My issue was using app = Celery() in other_app/tasks.py. Using from main_app.celery import app as celery_app solved the problem!
  • w--
    w-- almost 6 years
    this is the only thing that worked for me. thank you.