Как вы единичный тест задачи сельдерея?
документация по сельдерею упоминает тестирование сельдерея в Django но не объясняет, как проверить задачу сельдерея, если вы не используете Django. Как ты это делаешь?
8 ответов:
можно тестировать задачи синхронно с помощью любого unittest lib там. Я обычно делаю 2 разных тестовых сеанса при работе с задачами сельдерея. Первый (как я предлагаю ниже) полностью синхронен и должен быть тем, который гарантирует, что алгоритм делает то, что он должен делать. Второй сеанс использует всю систему (включая брокера) и гарантирует, что у меня нет проблем с сериализацией или любым другим распределением, связью проблема.
Так:
from celery import Celery celery = Celery() @celery.task def add(x, y): return x + yи ваш тест:
from nose.tools import eq_ def test_add_task(): rst = add.apply(args=(4, 4)).get() eq_(rst, 8)надеюсь, что это поможет!
Я использую этот:
with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True): ...Docs:http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager
CELERY_ALWAYS_EAGER позволяет запускать задачу синхронно, и вам не нужен сервер сельдерея.
зависит от того, что именно вы хотите тестировать.
- проверьте код задачи напрямую. Не называй задач".задержка.(..) задача "просто вызов" (...) "из ваших модульных тестов.
- использовать CELERY_ALWAYS_EAGER. Это приведет к тому, что ваши задачи будут вызваны немедленно в тот момент, когда вы скажете "задача.задержка.(..)", так что вы можете проверить весь путь (но не любое асинхронное поведение).
unittest
import unittest from myproject.myapp import celeryapp class TestMyCeleryWorker(unittest.TestCase): def setUp(self): celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)py.тестовые приспособления
# conftest.py from myproject.myapp import celeryapp @pytest.fixture(scope='module') def celery_app(request): celeryapp.conf.update(CELERY_ALWAYS_EAGER=True) return celeryapp # test_tasks.py def test_some_task(celery_app): ...добавление: сделайте send_task respect нетерпеливым
from celery import current_app def send_task(name, args=(), kwargs={}, **opts): # https://github.com/celery/celery/issues/581 task = current_app.tasks[name] return task.apply(args, kwargs, **opts) current_app.send_task = send_task
для тех, кто на сельдерее 4 это:
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)поскольку имена параметров были изменены и нуждаются в обновлении, если вы решите обновить, см.
http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names
по состоянию на сельдерей 3.0 один из способов установить
CELERY_ALWAYS_EAGERin Джанго - это:from django.test import TestCase, override_settings from .foo import foo_celery_task class MyTest(TestCase): @override_settings(CELERY_ALWAYS_EAGER=True) def test_foo(self): self.assertTrue(foo_celery_task.delay())
в моем случае (и я думаю многих других), все, что я хотел, чтобы проверить внутреннюю логику задачи с помощью pytest.
TL; DR; закончил тем, что издевался над всем (вариант 2)
Пример Использования Case:
proj/tasks.py@shared_task(bind=True) def add_task(self, a, b): return a+b;
tests/test_tasks.pyfrom proj import add_task def test_add(): assert add_task(1, 2) == 3, '1 + 2 should equal 3'но поскольку
shared_taskдекоратор делает много внутренней логики сельдерея, это на самом деле не единица тесты.Итак, для меня было 2 варианта:
Вариант 1: отдельная внутренняя логика
proj/tasks_logic.pydef internal_add(a, b): return a + b;
proj/tasks.pyfrom .tasks_logic import internal_add @shared_task(bind=True) def add_task(self, a, b): return internal_add(a, b);это выглядит очень странно, и кроме того, что делает его менее читаемым, он требует вручную извлекать и передавать атрибуты, которые являются частью запроса, например
task_idв случае, если вам это нужно, что делает логику более менее чисто.вариант 2: издевается
издеваясь над внутренностями сельдерея
tests/__init__.py# noinspection PyUnresolvedReferences from celery import shared_task from mock import patch def mock_signature(**kwargs): return {} def mocked_shared_task(*decorator_args, **decorator_kwargs): def mocked_shared_decorator(func): func.signature = func.si = func.s = mock_signature return func return mocked_shared_decorator patch('celery.shared_task', mocked_shared_task).start()который затем позволяет мне издеваться над объектом запроса (опять же, если вам нужны вещи из запроса, такие как идентификатор или счетчик повторных попыток.
tests/test_tasks.pyfrom proj import add_task class MockedRequest: def __init__(self, id=None): self.id = id or 1 class MockedTask: def __init__(self, id=None): self.request = MockedRequest(id=id) def test_add(): mocked_task = MockedTask(id=3) assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'это решение гораздо более ручное, но, это дает мне контроль мне нужно на самом деле unit тест, не повторяясь, и не теряя сельдерея сферу.
С Сельдереем В4.0, py.тестовые приспособления являются предоставил чтобы запустить работника сельдерея только для теста и завершения работы, когда это будет сделано:
def test_myfunc_is_executed(celery_session_worker): # celery_session_worker: <Worker: [email protected] (running)> assert myfunc.delay().wait(3)среди других приспособлений описанных на http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test, Вы можете изменить параметры сельдерея по умолчанию, переопределив
celery_configприспособление таким образом:@pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['json', 'pickle'], 'result_serializer': 'pickle', }по умолчанию тестовый работник использует брокер в памяти и серверную часть результатов. Нет нужно использовать локальный Redis или RabbitMQ, если не тестировать конкретные функции.
Comments