How to use Flask-SQLAlchemy in a Celery task

29,707

Solution 1

Update: We've since started using a better way to handle application teardown and set up on a per-task basis, based on the pattern described in the more recent flask documentation.

extensions.py

import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

class FlaskCelery(Celery):

    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)


celery = FlaskCelery()
db = SQLAlchemy()

app.py

from flask import Flask
from extensions import celery, db

def create_app():
    app = Flask()
    
    #configure/initialize all your extensions
    db.init_app(app)
    celery.init_app(app)

    return app

Once you've set up your app this way, you can run and use celery without having to explicitly run it from within an application context, as all your tasks will automatically be run in an application context if necessary, and you don't have to explicitly worry about post-task teardown, which is an important issue to manage (see other responses below).

Troubleshooting

Those who keep getting with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app' make sure to:

  1. Keep the celery import at the app.py file level. Avoid:

app.py

from flask import Flask

def create_app():
    app = Flask()

    initiliaze_extensions(app)

    return app

def initiliaze_extensions(app):
    from extensions import celery, db # DOOMED! Keep celery import at the FILE level
    
    db.init_app(app)
    celery.init_app(app)
  1. Start you celery workers BEFORE you flask run and use
celery worker -A app:celery -l info -f celery.log

Note the app:celery, i.e. loading from app.py.

You can still import from extensions to decorate tasks, i.e. from extensions import celery.

Old answer below, still works, but not as clean a solution

I prefer to run all of celery within the application context by creating a separate file that invokes celery.start() with the application's context. This means your tasks file doesn't have to be littered with context setup and teardowns. It also lends itself well to the flask 'application factory' pattern.

extensions.py

from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

tasks.py

from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun

@celery.task
def do_some_stuff():
    current_app.logger.info("I have the application context")
    #you can now use the db object from extensions

@task_postrun.connect
def close_session(*args, **kwargs):
    # Flask SQLAlchemy will automatically create new sessions for you from 
    # a scoped session factory, given that we are maintaining the same app
    # context, this ensures tasks have a fresh session (e.g. session errors 
    # won't propagate across tasks)
    db.session.remove()

app.py

from extensions import celery, db

def create_app():
    app = Flask()
    
    #configure/initialize all your extensions
    db.init_app(app)
    celery.config_from_object(app.config)

    return app

RunCelery.py

from app import create_app
from extensions import celery

app = create_app()

if __name__ == '__main__':
    with app.app_context():
        celery.start()

Solution 2

In your tasks.py file do the following:

from main import create_app
app = create_app()

celery = Celery(__name__)
celery.add_defaults(lambda: app.config)

@celery.task
def create_facet(project_id, **kwargs):
    with app.test_request_context():
       # your code

Solution 3

I used Paul Gibbs' answer with two differences. Instead of task_postrun I used worker_process_init. And instead of .remove() I used db.session.expire_all().

I'm not 100% sure, but from what I understand the way this works is when Celery creates a worker process, all inherited/shared db sessions will be expired, and SQLAlchemy will create new sessions on demand unique to that worker process.

So far it seems to have fixed my problem. With Paul's solution, when one worker finished and removed the session, another worker using the same session was still running its query, so db.session.remove() closed the connection while it was being used, giving me a "Lost connection to MySQL server during query" exception.

Thanks Paul for steering me in the right direction!

Nevermind that didn't work. I ended up having an argument in my Flask app factory to not run db.init_app(app) if Celery was calling it. Instead the workers will call it after Celery forks them. I now see several connections in my MySQL processlist.

from extensions import db
from celery.signals import worker_process_init
from flask import current_app

@worker_process_init.connect
def celery_worker_init_db(**_):
    db.init_app(current_app)

Solution 4

from flask import Flask
from werkzeug.utils import import_string
from celery.signals import worker_process_init, celeryd_init
from flask_celery import Celery
from src.app import config_from_env, create_app

celery = Celery()

def get_celery_conf():
    config = import_string('src.settings')
    config = {k: getattr(config, k) for k in dir(config) if k.isupper()}
    config['BROKER_URL'] = config['CELERY_BROKER_URL']
    return config

@celeryd_init.connect
def init_celeryd(conf=None, **kwargs):
    conf.update(get_celery_conf())

@worker_process_init.connect
def init_celery_flask_app(**kwargs):
    app = create_app()
    app.app_context().push()
  • Update celery config at celeryd init
  • Use your flask app factory to inititalize all flask extensions, including SQLAlchemy extension.

By doing this, we are able to maintain database connection per-worker.

If you want to run your task under flask context, you can subclass Task.__call__:

class SmartTask(Task):

    abstract = True

    def __call__(self, *_args, **_kwargs):
        with self.app.flask_app.app_context():
            with self.app.flask_app.test_request_context():
                result = super(SmartTask, self).__call__(*_args, **_kwargs)
            return result

class SmartCelery(Celery):

    def init_app(self, app):
        super(SmartCelery, self).init_app(app)
        self.Task = SmartTask
Share:
29,707
PanosJee
Author by

PanosJee

Pythonista, x-Rubyist, App Engine veteran, founder at BugSense

Updated on October 11, 2020

Comments

  • PanosJee
    PanosJee over 3 years

    I recently switch to Celery 3.0. Before that I was using Flask-Celery in order to integrate Celery with Flask. Although it had many issues like hiding some powerful Celery functionalities but it allowed me to use the full context of Flask app and especially Flask-SQLAlchemy.

    In my background tasks I am processing data and the SQLAlchemy ORM to store the data. The maintainer of Flask-Celery has dropped support of the plugin. The plugin was pickling the Flask instance in the task so I could have full access to SQLAlchemy.

    I am trying to replicate this behavior in my tasks.py file but with no success. Do you have any hints on how to achieve this?

  • jaapz
    jaapz over 9 years
    Late comment, but i think it is important. I don't think using test_request_context is a great idea here as it is meant for test environments, not production.
  • cbron
    cbron over 9 years
    I don't understand the new update. Where do the tasks go, in app.py or tasks.py ? If in tasks.py, where do you import celery from, extensions ? Are you still using runcelery.py ? If so do you still have to use create_app() ?
  • bre
    bre over 9 years
    I appended to extensions.py the 'add_together' task: @celery.task() def add_together(a, b): return a + b and when i import it and call it like result = add_together.delay(5, 11) after running worker with celery -A app.extensions.celery worker -l debug i get this error: AttributeError: 'FlaskCelery' object has no attribute 'app'. But if i import celery from extensions.py (it's a FlaskCelery instance) it has the right app value. Please, What am i missing?
  • ken
    ken over 8 years
    Getting this error with the new update: with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app'
  • Adversus
    Adversus over 8 years
    @ken: did you call celery.init_app(app), or alternatively create the FlaskCelery object with an app argument?
  • soasme
    soasme almost 8 years
    There is a potential issue in this example. If you start celery worker with --concurrency=N(N>1) or --autoscale=M,N(N>1), you will probably get some MySQL2006/2014 error or sqlalchemy.exc.ResourceClosedError. Since db.session is a non-threadsafe object, we must init db instance before each worker init, as what @Robpol86 did.
  • Ryan Chou
    Ryan Chou over 6 years
    Is it the right solution for flask-rq too? It seems confused me by using flask-rq with flask-sqlalchemy. I have met the same issue on using models in async-task (rq-worker). It occasionally raised the This result object does not return rows. It has been closed automatically Error.
  • Clint
    Clint almost 6 years
    After looking around for about 18 hours I finally found something that helped. Its weird to me that calling app=create_app and app_context outside of create app or a manger.py works but it does
  • Downgoat
    Downgoat over 5 years
    Link is dead and now 404s
  • Jason
    Jason about 5 years
    I could not get this answer to work in my application. If I swap out my current celery = Celery() with celery = FlaskCelery() everything seems to work except that Celery cannot find my tasks and I find that out when I try to run one. I receive the message 'Received unregistered task'. Now if I switch the celery worker to use my celery from a special module that calls create_app (similar to the old version of the answer) then everything seems to work. But then I don't need FlaskCelery.
  • Rboreal_Frippery
    Rboreal_Frippery almost 5 years
    Wanted to mention that the reason I needed this integration was just to properly get current_app.config['key'] calls to work, and unfortunately I still receive the outside context warning when trying to do this in a celery task.