Как вы единичный тест задачи сельдерея?



документация по сельдерею упоминает тестирование сельдерея в Django но не объясняет, как проверить задачу сельдерея, если вы не используете Django. Как ты это делаешь?

750   8  

8 ответов:

можно тестировать задачи синхронно с помощью любого unittest lib там. Я обычно делаю 2 разных тестовых сеанса при работе с задачами сельдерея. Первый (как я предлагаю ниже) полностью синхронен и должен быть тем, который гарантирует, что алгоритм делает то, что он должен делать. Второй сеанс использует всю систему (включая брокера) и гарантирует, что у меня нет проблем с сериализацией или любым другим распределением, связью проблема.

Так:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

и ваш тест:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

надеюсь, что это поможет!

Я использую этот:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Docs:http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER позволяет запускать задачу синхронно, и вам не нужен сервер сельдерея.

зависит от того, что именно вы хотите тестировать.

  • проверьте код задачи напрямую. Не называй задач".задержка.(..) задача "просто вызов" (...) "из ваших модульных тестов.
  • использовать CELERY_ALWAYS_EAGER. Это приведет к тому, что ваши задачи будут вызваны немедленно в тот момент, когда вы скажете "задача.задержка.(..)", так что вы можете проверить весь путь (но не любое асинхронное поведение).

unittest

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.тестовые приспособления

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

добавление: сделайте send_task respect нетерпеливым

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task

для тех, кто на сельдерее 4 это:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

поскольку имена параметров были изменены и нуждаются в обновлении, если вы решите обновить, см.

http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names

по состоянию на сельдерей 3.0 один из способов установить CELERY_ALWAYS_EAGER in Джанго - это:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())

в моем случае (и я думаю многих других), все, что я хотел, чтобы проверить внутреннюю логику задачи с помощью pytest.

TL; DR; закончил тем, что издевался над всем (вариант 2)


Пример Использования Case:

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

но поскольку shared_task декоратор делает много внутренней логики сельдерея, это на самом деле не единица тесты.

Итак, для меня было 2 варианта:

Вариант 1: отдельная внутренняя логика

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

это выглядит очень странно, и кроме того, что делает его менее читаемым, он требует вручную извлекать и передавать атрибуты, которые являются частью запроса, например task_id в случае, если вам это нужно, что делает логику более менее чисто.

вариант 2: издевается
издеваясь над внутренностями сельдерея

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

который затем позволяет мне издеваться над объектом запроса (опять же, если вам нужны вещи из запроса, такие как идентификатор или счетчик повторных попыток.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

это решение гораздо более ручное, но, это дает мне контроль мне нужно на самом деле unit тест, не повторяясь, и не теряя сельдерея сферу.

С Сельдереем В4.0, py.тестовые приспособления являются предоставил чтобы запустить работника сельдерея только для теста и завершения работы, когда это будет сделано:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: [email protected] (running)>
    assert myfunc.delay().wait(3)

среди других приспособлений описанных на http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test, Вы можете изменить параметры сельдерея по умолчанию, переопределив celery_config приспособление таким образом:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

по умолчанию тестовый работник использует брокер в памяти и серверную часть результатов. Нет нужно использовать локальный Redis или RabbitMQ, если не тестировать конкретные функции.

Comments

    Ничего не найдено.