Колба с create app, SQLAlchemy и сельдереем
Я действительно изо всех сил пытаюсь получить правильную настройку для колбы, SQLAlchemy и сельдерея. Я много искал и пробовал разные подходы, но ничего не получалось. Либо я пропустил контекст приложения, либо не могу запустить workers, либо есть какие-то другие проблемы. Структура очень общая, так что я могу построить более крупное приложение.
Я использую: Flask 0.10.1, SQLAlchemy 1.0, Celery 3.1.13, моя текущая настройка следующее:
App/__init__.py
#Empty
App/config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
@staticmethod
def init_app(app):
pass
class LocalConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir,
"data-dev.sqlite")
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
config = {
"local": LocalConfig}
App/exstensions.py
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
App/factory.py
from extensions import db, celery
from flask import Flask
from flask import g
from config import config
def create_before_request(app):
def before_request():
g.db = db
return before_request
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
db.init_app(app)
celery.config_from_object(config)
# Register the blueprints
# Add the before request handler
app.before_request(create_before_request(app))
return app
App/manage.py
from factory import create_app
app = create_app("local")
from flask import render_template
from flask import request
@app.route('/test', methods=['POST'])
def task_simple():
import tasks
tasks.do_some_stuff.delay()
return ""
if __name__ == "__main__":
app.run()
App/models.py
from extensions import db
class User(db.Model):
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(128), unique=True, nullable=False)
App/tasks.py
from extensions import celery
from celery.signals import task_prerun
from flask import g, current_app
@task_prerun.connect
def close_session(*args, **kwargs):
with current_app.app_context():
# use g.db
print g
@celery.task()
def do_some_stuff():
with current_app.app_context():
# use g.db
print g
В папке приложения:
- запуск веб-сервера разработки с помощью:
python.exe manage.py
- начиная с рабочих:
celery.exe worker -A tasks
Я получаю ошибку импорта, которая не имеет для меня никакого смысла.
Должен ли я структурировать приложение по-другому? В конце концов, я думаю, что мне нужна довольно простая настройка, например, использование Flask с фабричным шаблоном, возможность использовать расширение Flask-SQLAlchmey и иметь некоторого работника, который должен получить доступ к базе данных.
Любая помощь высоко ценится.
Трассировка выполняется при запуске celery worker.
Traceback (most recent call last):
File "[PATH]scriptscelery-script.py", line 9, in <module>
load_entry_point('celery==3.1.13', 'console_scripts', 'celery')()
File "[PATH]libsite-packagescelery__main__.py", line 30, in main
main()
File "[PATH]libsite-packagescelerybincelery.py", line 81, in main
cmd.execute_from_commandline(argv)
File "[PATH]libsite-packagescelerybincelery.py", line 769, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "[PATH]libsite-packagescelerybinbase.py", line 305, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "[PATH]libsite-packagescelerybinbase.py", line 473, in setup_app_from_commandline
user_preload = tuple(self.app.user_options['preload'] or ())
AttributeError: 'Flask' object has no attribute 'user_options'
UPDATE я изменяю код в соответствии с предложением в комментарии. Работник начинает теперь вверх но когда тестировать его с get запросом к http://127.0.0.1:5000/test. Я получаю следующую обратную связь:
Traceback (most recent call last):
File "[PATH]libsite-packagesceleryapptrace.py", line 230, in trace_task
args=args, kwargs=kwargs)
File "[PATH]libsite-packagesceleryutilsdispatchsignal.py", line 166, in send
response = receiver(signal=self, sender=sender, **named)
File "[PATH]appstackoverflowtasks.py", line 7, in close_session
with current_app.app_context():
File "[PATH]libsite-packageswerkzeuglocal.py", line 338, in __getattr__
return getattr(self._get_current_object(), name)
File "[PATH]libsite-packageswerkzeuglocal.py", line 297, in _get_current_object
return self.__local()
File "[PATH]libsite-packagesflaskglobals.py", line 34, in _find_app
raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context exc, exc_info.traceback)))
Обновление основываясь на комментариях Мартина, я изменил код. Текущая рабочая версия находится под: https://gist.github.com/anonymous/fa47834db2f4f3b8b257 [52]}.
Любые дальнейшие улучшения или предложения приветствуются.
2 ответов:
Я ушел с Советом current_app.
Ваш объект сельдерей нуждается в доступе к контексту приложения. Я нашел в интернете информацию о создании объекта сельдерей с фабричной функцией. Приведенный ниже пример тестируется без посредника сообщений.
#factory.py from celery import Celery from config import config def create_celery_app(app=None): app = app or create_app(config) celery = Celery(__name__, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celeryИ в tasks.py:
#tasks.py from factory import create_celery_app from celery.signals import task_prerun from flask import g celery = create_celery_app() @task_prerun.connect def celery_prerun(*args, **kwargs): #print g with celery.app.app_context(): # # use g.db print g @celery.task() def do_some_stuff(): with celery.app.app_context(): # use g.db g.user = "test" print g.userНекоторые ссылки:
Шаблон колбы для создания экземпляра сельдерея с фабричной функцией
Применение с использованием как фабрики приложений, так и сельдерей
Вот решение, которое работает с шаблоном фабрики приложений flask, а также создает задачу сельдерея с контекстом, без необходимости явно использовать
app.app_context()в задачах. В моем приложении очень сложно получить этот объект приложения, избегая циклического импорта, но это решает проблему. Это также хорошо для последней версии сельдерея 4.2 на момент написания статьи.Структура:
repo_name/ manage.py base/ base/__init__.py base/app.py base/runcelery.py base/celeryconfig.py base/utility/celery_util.py base/tasks/workers.pyТаким образом,
baseявляется основным пакетом приложения в этом примере. Вbase/__init__.pyмы создаем сельдерей пример, как показано ниже:from celery import Celery celery = Celery('base', config_source='base.celeryconfig')Файл
base/app.pyсодержит фабрику приложений flaskcreate_appи обратите внимание наinit_celery(app, celery), которая содержит:from base import celery from base.utility.celery_util import init_celery def create_app(config_obj): """An application factory, as explained here: http://flask.pocoo.org/docs/patterns/appfactories/. :param config_object: The configuration object to use. """ app = Flask('base') app.config.from_object(config_obj) init_celery(app, celery=celery) register_extensions(app) register_blueprints(app) register_errorhandlers(app) register_app_context_processors(app) return appПереходим к содержанию
base/runcelery.py:from flask.helpers import get_debug_flag from base.settings import DevConfig, ProdConfig from base import celery from base.app import create_app from base.utility.celery_util import init_celery CONFIG = DevConfig if get_debug_flag() else ProdConfig app = create_app(CONFIG) init_celery(app, celery)Далее, файл
base/celeryconfig.py(в качестве примера):# -*- coding: utf-8 -*- """ Configure Celery. See the configuration guide at -> http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration """ ## Broker settings. broker_url = 'pyamqp://guest:guest@localhost:5672//' broker_heartbeat=0 # List of modules to import when the Celery worker starts. imports = ('base.tasks.workers',) ## Using the database to store task state and results. result_backend = 'rpc' #result_persistent = False accept_content = ['json', 'application/text'] result_serializer = 'json' timezone = "UTC" # define periodic tasks / cron here # beat_schedule = { # 'add-every-10-seconds': { # 'task': 'workers.add_together', # 'schedule': 10.0, # 'args': (16, 16) # }, # }Теперь определите init_celery в файле
base/utility/celery_util.py:# -*- coding: utf-8 -*- def init_celery(app, celery): """Add flask app context to celery.Task""" TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTaskДля рабочих в
base/tasks/workers.py:from base import celery as celery_app from flask_security.utils import config_value, send_mail from base.bp.users.models.user_models import User @celery_app.task def send_welcome_email(email, user_id, confirmation_link): """Background task to send a welcome email with flask-security's mail. You don't need to use with app.app_context() as Task has app context. """ user = User.query.filter_by(id=user_id).first() print(f'sending user {user} a welcome email') send_mail(config_value('EMAIL_SUBJECT_REGISTER'), email, 'welcome', user=user, confirmation_link=confirmation_link) @celery_app.task def do_some_stuff(): print(g)Затем вам нужно запустить celery beat и celery worker в двух разных командах cmd из внутри
repo_nameпапка .В одной командной строке выполните команду
celery -A base.runcelery:celery beat, а в другойcelery -A base.runcelery:celery worker.Затем выполните свою задачу, которая нуждалась в контексте колбы. Должен работать.
Comments