How to configure CELERYBEAT_SCHEDULE in Django settings?

10,120

Why don't you try like the following and let me know if it worked out for you or not. It does work for me.

In settings.py

CELERYBEAT_SCHEDULE = {
    'my_scheduled_job': {
        'task': 'run_scheduled_jobs', # the same goes in the task name
        'schedule': crontab(),
    },
}

And in tasks.py..

from celery.task import task # notice the import of task and not shared task. 

@task(name='run_scheduled_jobs') # task name found! celery will do its job
def run_scheduled_jobs():
    # do whatever stuff you do
    return True

But if you are looking for shared_task then..

@shared_task(name='my_shared_task') # name helps celery identify the functions it has to run
def my_shared_task():
    # do what you want here..
    return True

I use shared task for async jobs.. So I need to call it from a function like the following..

in views.py / or anywhere.py in your project app

def some_function():
    my_shared_task.apply_async(countdown= in_seconds)
    return True

and just in case if you have forgotten then remember to include your app in which you are trying to run to the tasks..

INSTALLED_APPS = [...
 'my_app'...
] # include app

I'm sure this approach works fine.. Thanks

Share:
10,120
a_Fraley
Author by

a_Fraley

I am a fullstack developer with 3+ years in Python, Django, Celery, Vue.js, Docker, Kubernetes and Linux.

Updated on June 13, 2022

Comments

  • a_Fraley
    a_Fraley almost 2 years

    I can get this to run as a standalone application, but I am having trouble getting it to work in Django.

    Here is the stand alone code:

    from celery import Celery
    from celery.schedules import crontab
    
    
    app = Celery('tasks')
    app.conf.update(
        CELERY_TASK_SERIALIZER='json',
        CELERY_RESULT_SERIALIZER='json',
        CELERY_ACCEPT_CONTENT=['json'],
        CELERY_TIMEZONE='US/Central',
        CELERY_ENABLE_UTC=True,
        CELERYBEAT_SCHEDULE = {
        'test': {
            'task': 'tasks.test',
            'schedule': crontab(),
            },
        }
    )
    
    @app.task
    def test():
        with open('test.txt', 'a') as f:
            f.write('Hello, World!\n')`
    

    It feeds the Rabbitmq server and writes to the file every minute. It works like a charm, but when I try to get it to work in a Django I get this error:

    Did you remember to import the module containing this task? Or maybe you are using relative imports? Please see ____ for more information.

    The full contents of the message body was: {'retries': 0, 'eta': None, 'kwargs': {}, 'taskset': None, 'timelimit': [None, None], 'callbacks': None, 'task': 'proj.test', 'args': [], 'expires': None, 'id': '501ca998-b5eb-4ba4-98a8-afabda9e88dd', 'utc': True, 'errbacks': None, 'chord': None} (246b) Traceback (most recent call last): File "/home/user/CeleryDjango/venv/lib/python3.5/site-packages/celery/worker/consumer.py", line 456, in on_task_received strategies[name](message, body, KeyError: 'proj.test' [2016-06-16 01:16:00,051: INFO/Beat] Scheduler: Sending due task test (proj.test) [2016-06-16 01:16:00,055: ERROR/MainProcess] Received unregistered task of type 'proj.test'.

    And this is my code in Django:

    # CELERY STUFF
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TIMEZONE = 'US/Central'
    CELERYBEAT_SCHEDULE = {
        'test': {
            'task': 'proj.test',
            'schedule': crontab(),
        }
    }
    

    celery.py

    from __future__ import absolute_import
    
    import os
    from celery import Celery
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    from django.conf import settings  # noqa
    
    app = Celery('proj')
    
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

    task.py

    from __future__ import absolute_import
    from celery import shared_task
    
    
    @shared_task
    def test():
        with open('test.txt', 'w') as f:
            print('Hello, World', file=f)
    

    init.py

    from __future__ import absolute_import
    
    from .celery import app as celery_app 
    

    Any thoughts on this are most appreciated. Thanks.

  • a_Fraley
    a_Fraley almost 8 years
    Thanks, so much. This appears to work and confirmed my initial suspicion that the error had to do with @shared_tasks and the Celery unable to see the task because of. Thanks, again; it worked.
  • d-coder
    d-coder almost 8 years
    This deserves an upvote ? :P Also mark this as an answer so that it'll help someone else in the future :)
  • webtweakers
    webtweakers about 6 years
    Unfortunately by now from celery.task import task is deprecated and shared_task is for use in libraries.
  • Марсель Абдуллин
    Марсель Абдуллин almost 4 years
    Good afternoon, I have been trying to cope with a similar task for many days. Could you help me with this question I would be very grateful stackoverflow.com/questions/62801096/…