How to run a function periodically with Flask and Celery?

14,736

Solution 1

Do you have Celery worker and Celery beat running? Scheduled tasks are handled by beat, which queues the task mentioned when appropriate. Worker then actually crunches the numbers and executes your task.

celery worker --app myproject--loglevel=info
celery beat --app myproject

Your task however looks like it's calling the Flask app's logger. When using the worker, you probably don't have the Flask application around (since it's in another process). Try using a normal Python logger for the demo task.

Solution 2

Well, celery beat can be embedded in regular celery worker as well, with -B parameter in your command.

celery -A --app myproject --loglevel=info -B

It is only recommended for the development environment. For production, you should run beat and celery workers separately as documentation mentions. Otherwise, your periodic task will run more than one time.

Solution 3

A celery task by default will run outside of the Flask app context and thus it won't have access to Flask app instance. However it's very easy to create the Flask app context while running a task by using app_context method of the Flask app object.

app = Flask(__name__)
celery = Celery(app.name)

@celery.task
def task():
    with app.app_context():
        app.logger.info('running my task')

This article by Miguel Grinberg is a very good place to get a primer on the basics of using Celery in a Flask application.

Share:
14,736
ustroetz
Author by

ustroetz

GIS Developer at ally

Updated on July 24, 2022

Comments

  • ustroetz
    ustroetz almost 2 years

    I have a flask app that roughly looks like this:

    app = Flask(__name__)
    @app.route('/',methods=['POST'])
    def foo():
       data = json.loads(request.data)
       # do some stuff
    
       return "OK"
    

    Now in addition I would like to run a function every ten seconds from that script. I don't want to use sleep for that. I have the following celery script in addition:

    from celery import Celery
    from datetime import timedelta
    celery = Celery('__name__')
    
    CELERYBEAT_SCHEDULE = {
        'add-every-30-seconds': {
            'task': 'tasks.add',
            'schedule': timedelta(seconds=10)
        },
    }
    
    
    
    @celery.task(name='tasks.add')
    def hello():
        app.logger.info('run my function')
    

    The script works fine, but the logger.info is not executed. What am I missing?