Колба с create app, SQLAlchemy и сельдереем



Я действительно изо всех сил пытаюсь получить правильную настройку для колбы, SQLAlchemy и сельдерея. Я много искал и пробовал разные подходы, но ничего не получалось. Либо я пропустил контекст приложения, либо не могу запустить workers, либо есть какие-то другие проблемы. Структура очень общая, так что я могу построить более крупное приложение.



Я использую: Flask 0.10.1, SQLAlchemy 1.0, Celery 3.1.13, моя текущая настройка следующее:



App/__init__.py



#Empty


App/config.py



import os
basedir = os.path.abspath(os.path.dirname(__file__))

class Config:

@staticmethod
def init_app(app):
pass

class LocalConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir,
"data-dev.sqlite")
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'


config = {
"local": LocalConfig}


App/exstensions.py



from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()


App/factory.py



from extensions import db, celery
from flask import Flask
from flask import g
from config import config

def create_before_request(app):
def before_request():
g.db = db
return before_request


def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])

db.init_app(app)
celery.config_from_object(config)

# Register the blueprints

# Add the before request handler
app.before_request(create_before_request(app))
return app


App/manage.py



from factory import create_app

app = create_app("local")

from flask import render_template
from flask import request

@app.route('/test', methods=['POST'])
def task_simple():
import tasks
tasks.do_some_stuff.delay()
return ""

if __name__ == "__main__":
app.run()


App/models.py



from extensions import db

class User(db.Model):
__tablename__ = "user"

id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(128), unique=True, nullable=False)


App/tasks.py



from extensions import celery
from celery.signals import task_prerun
from flask import g, current_app


@task_prerun.connect
def close_session(*args, **kwargs):
with current_app.app_context():
# use g.db
print g

@celery.task()
def do_some_stuff():
with current_app.app_context():
# use g.db
print g


В папке приложения:




  • запуск веб-сервера разработки с помощью: python.exe manage.py

  • начиная с рабочих: celery.exe worker -A tasks


Я получаю ошибку импорта, которая не имеет для меня никакого смысла.
Должен ли я структурировать приложение по-другому? В конце концов, я думаю, что мне нужна довольно простая настройка, например, использование Flask с фабричным шаблоном, возможность использовать расширение Flask-SQLAlchmey и иметь некоторого работника, который должен получить доступ к базе данных.



Любая помощь высоко ценится.

Трассировка выполняется при запуске celery worker.



Traceback (most recent call last):

File "[PATH]scriptscelery-script.py", line 9, in <module>
load_entry_point('celery==3.1.13', 'console_scripts', 'celery')()

File "[PATH]libsite-packagescelery__main__.py", line 30, in main
main()

File "[PATH]libsite-packagescelerybincelery.py", line 81, in main
cmd.execute_from_commandline(argv)

File "[PATH]libsite-packagescelerybincelery.py", line 769, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))

File "[PATH]libsite-packagescelerybinbase.py", line 305, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)

File "[PATH]libsite-packagescelerybinbase.py", line 473, in setup_app_from_commandline
user_preload = tuple(self.app.user_options['preload'] or ())
AttributeError: 'Flask' object has no attribute 'user_options'


UPDATE я изменяю код в соответствии с предложением в комментарии. Работник начинает теперь вверх но когда тестировать его с get запросом к http://127.0.0.1:5000/test. Я получаю следующую обратную связь:



Traceback (most recent call last):
File "[PATH]libsite-packagesceleryapptrace.py", line 230, in trace_task
args=args, kwargs=kwargs)

File "[PATH]libsite-packagesceleryutilsdispatchsignal.py", line 166, in send
response = receiver(signal=self, sender=sender, **named)

File "[PATH]appstackoverflowtasks.py", line 7, in close_session
with current_app.app_context():

File "[PATH]libsite-packageswerkzeuglocal.py", line 338, in __getattr__
return getattr(self._get_current_object(), name)

File "[PATH]libsite-packageswerkzeuglocal.py", line 297, in _get_current_object
return self.__local()

File "[PATH]libsite-packagesflaskglobals.py", line 34, in _find_app
raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context exc, exc_info.traceback)))


Обновление основываясь на комментариях Мартина, я изменил код. Текущая рабочая версия находится под: https://gist.github.com/anonymous/fa47834db2f4f3b8b257 [52]}.
Любые дальнейшие улучшения или предложения приветствуются.

584   2  

2 ответов:

Я ушел с Советом current_app.

Ваш объект сельдерей нуждается в доступе к контексту приложения. Я нашел в интернете информацию о создании объекта сельдерей с фабричной функцией. Приведенный ниже пример тестируется без посредника сообщений.

#factory.py
from celery import Celery
from config import config

def create_celery_app(app=None):
    app = app or create_app(config)
    celery = Celery(__name__, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

И в tasks.py:

#tasks.py
from factory import create_celery_app
from celery.signals import task_prerun
from flask import g

celery = create_celery_app()

@task_prerun.connect
def celery_prerun(*args, **kwargs):
    #print g
    with celery.app.app_context():
    #   # use g.db
       print g

@celery.task()
def do_some_stuff():
    with celery.app.app_context():
        # use g.db
        g.user = "test"
        print g.user

Некоторые ссылки:

Шаблон колбы для создания экземпляра сельдерея с фабричной функцией

Применение с использованием как фабрики приложений, так и сельдерей

Источник для указанного приложения factory.py

Источник для применения tasks.py

Вот решение, которое работает с шаблоном фабрики приложений flask, а также создает задачу сельдерея с контекстом, без необходимости явно использовать app.app_context() в задачах. В моем приложении очень сложно получить этот объект приложения, избегая циклического импорта, но это решает проблему. Это также хорошо для последней версии сельдерея 4.2 на момент написания статьи.

Структура:

repo_name/
    manage.py
    base/
    base/__init__.py
    base/app.py
    base/runcelery.py
    base/celeryconfig.py
    base/utility/celery_util.py
    base/tasks/workers.py

Таким образом, base является основным пакетом приложения в этом примере. В base/__init__.py мы создаем сельдерей пример, как показано ниже:

from celery import Celery
celery = Celery('base', config_source='base.celeryconfig')

Файл base/app.py содержит фабрику приложений flask create_app и обратите внимание на init_celery(app, celery), которая содержит:

from base import celery
from base.utility.celery_util import init_celery

def create_app(config_obj):
    """An application factory, as explained here:
    http://flask.pocoo.org/docs/patterns/appfactories/.
    :param config_object: The configuration object to use.
    """
    app = Flask('base')
    app.config.from_object(config_obj)
    init_celery(app, celery=celery)
    register_extensions(app)
    register_blueprints(app)
    register_errorhandlers(app)
    register_app_context_processors(app)
    return app

Переходим к содержанию base/runcelery.py:

from flask.helpers import get_debug_flag
from base.settings import DevConfig, ProdConfig
from base import celery
from base.app import create_app
from base.utility.celery_util import init_celery
CONFIG = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONFIG)
init_celery(app, celery)

Далее, файл base/celeryconfig.py (в качестве примера):

# -*- coding: utf-8 -*-
"""
Configure Celery. See the configuration guide at ->
http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
"""

## Broker settings.
broker_url = 'pyamqp://guest:guest@localhost:5672//'
broker_heartbeat=0

# List of modules to import when the Celery worker starts.
imports = ('base.tasks.workers',)

## Using the database to store task state and results.
result_backend = 'rpc'
#result_persistent = False

accept_content = ['json', 'application/text']

result_serializer = 'json'
timezone = "UTC"

# define periodic tasks / cron here
# beat_schedule = {
#    'add-every-10-seconds': {
#        'task': 'workers.add_together',
#        'schedule': 10.0,
#        'args': (16, 16)
#    },
# }

Теперь определите init_celery в файле base/utility/celery_util.py:

# -*- coding: utf-8 -*-

def init_celery(app, celery):
    """Add flask app context to celery.Task"""
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask

Для рабочих в base/tasks/workers.py:

from base import celery as celery_app
from flask_security.utils import config_value, send_mail
from base.bp.users.models.user_models import User

@celery_app.task
def send_welcome_email(email, user_id, confirmation_link):
    """Background task to send a welcome email with flask-security's mail.
    You don't need to use with app.app_context() as Task has app context.
    """
    user = User.query.filter_by(id=user_id).first()
    print(f'sending user {user} a welcome email')
    send_mail(config_value('EMAIL_SUBJECT_REGISTER'),
              email,
              'welcome', user=user,
              confirmation_link=confirmation_link) 

@celery_app.task
def do_some_stuff():
    print(g)

Затем вам нужно запустить celery beat и celery worker в двух разных командах cmd из внутри repo_name папка .

В одной командной строке выполните команду celery -A base.runcelery:celery beat, а в другой celery -A base.runcelery:celery worker.

Затем выполните свою задачу, которая нуждалась в контексте колбы. Должен работать.

Comments

    Ничего не найдено.