Как проверить состояние задачи в сельдерее?



Как проверить, выполняется ли задача в celery (в частности, я использую celery-django)?



Я прочитал документацию, и я погуглил, но я не вижу вызова, как:



my_example_task.state() == RUNNING


мой прецедент заключается в том, что у меня есть внешний (java) сервис для транскодирования. Когда я отправляю документ для транскодирования, я хочу проверить, выполняется ли задача, которая запускает эту службу, а если нет, то (повторно)запустить ее.



Я использую текущие стабильные версии-2.4, I верить.

1636   9  

9 ответов:

возвращает task_id (который задается из.delay ()) и спросите экземпляр сельдерея после этого о состоянии:

x = method.delay(1,2)
print x.task_id

при запросе, получить новый AsyncResult с помощью этого task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

каждый

создания AsyncResult объект из идентификатора задачи и путь, рекомендованный в часто задаваемые вопросы чтобы получить статус задачи, когда единственное, что у вас есть, это идентификатор задачи.

однако, по состоянию на Сельдерей 3.x, есть значительные предостережения, которые могут укусить людей, если они не обращают на них внимания. Это действительно зависит от конкретного сценария использования.

по умолчанию сельдерей не записывает состояние "работает".

для того чтобы сельдерей чтобы записать выполнение задачи, необходимо установить task_track_started до True. Вот простая задача, которая проверяет это:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

, когда task_track_started - это False, который является значением по умолчанию, государство-шоу PENDING хотя задача уже начался. Если вы установите task_track_started до True, тогда государство будет STARTED.

государство PENDING означает "Я не знаю."

An AsyncResult с государственным PENDING не означает ничего больше, чем сельдерей делает не знаю статус задачи. Это может быть связано с любым количеством причин.

в одном AsyncResult может быть построен с недопустимыми идентификаторами задач. Такие" задачи " будут считаться отложенными сельдереем:

>>> task.AsyncResult("invalid").status
'PENDING'

хорошо, так что никто не собирается кормить очевидно неверный ID для AsyncResult. Справедливо, но это также имеет для эффекта, что AsyncResult также рассмотрим задачу, которая успешно выполнена, но что сельдерей забыла как PENDING. Опять,в некоторых сценариях использования это может быть проблемой. Часть проблемы зависит от того, как сельдерей настроен для хранения результатов задач, потому что это зависит от наличия "надгробий" в бэкэнде результатов. ("Надгробия" - это термин, используемый в документации по сельдерею для блоков данных, которые записывают, как закончилась задача.) Используя AsyncResult не работает task_ignore_result и True. Более досадная проблема заключается в том, что сельдерей истекает надгробия по умолчанию. Элемент result_expires по умолчанию установлено значение 24 часа. Поэтому, если вы запускаете задачу и записываете идентификатор в долгосрочное хранилище и более 24 часов спустя, вы создаете AsyncResult С ним статус будет PENDING.

все" реальные задачи " начинаются в PENDING государство. Так что получение PENDING на задаче может означать, что задача была запрошена, но никогда не продвигалась дальше этого (по какой-либо причине). Или это может означать, что задача выполнена, но сельдерей забыл ее государство.

Оуч! AsyncResult не будет работать для меня. Что еще я могу сделать?

я предпочитаю следить за цели чем следить за задачи. Я действительно храню некоторую информацию о задачах, но это действительно вторично для отслеживания целей. Цели хранятся в хранилище независимо от сельдерея. Когда запрос должен выполнить вычисление, зависящее от некоторой достигнутой цели, он проверяет, была ли уже достигнута цель, если да, то он использует эту кэшированную цель, иначе он запускает задачу, которая будет влиять на цель, и отправляет клиенту, который сделал запрос HTTP ответ, который указывает, что он должен ждать результата.


имена переменных и гиперссылки выше предназначены для сельдерея 4.х. В 3.x соответствующие переменные и гиперссылки:CELERY_TRACK_STARTED,CELERY_IGNORE_RESULT,CELERY_TASK_RESULT_EXPIRES.

вы также можете создавать пользовательские состояния и обновлять его значение duting выполнение задачи. Этот пример из документов:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

старый вопрос, но я недавно столкнулась с этой проблемой.

Если вы пытаетесь получить task_id вы можете сделать это так:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

теперь вы точно знаете, что такое task_id и теперь можете использовать его для получения AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4

попробуй:

task.AsyncResult(task.request.id).state

это обеспечит статус задачи сельдерея. Если сельдерей задача уже находится под провал государство-он будет бросать исключение:

raised unexpected: KeyError('exc_type',)

для простых задач мы можем использовать http://flower.readthedocs.io/en/latest/screenshots.html и http://policystat.github.io/jobtastic/ для выполнения мониторинга.

и для сложных задач, скажем, задача, которая имеет дело с большим количеством других модулей. Мы рекомендуем вручную записывать ход выполнения и сообщение на конкретном блоке задач.

Я нашел полезную информацию в

работники проекта сельдерея направляют проверять-работников

в моем случае я проверяю, работает ли сельдерей.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

вы можете играть с inspect, чтобы получить ваши потребности.

помимо вышеуказанного программного подхода С помощью цветка статус задачи можно легко увидеть.

мониторинг в реальном времени с использованием событий сельдерея. Flower-это веб-инструмент для мониторинга и администрирования кластеров сельдерея.

  1. ход выполнения задачи и история
  2. возможность показать детали задачи (аргументы, время начала, время выполнения и многое другое)
  3. графики и статистика

Официальный Документ: цветок - сельдерей мониторинг инструмент

установка:

$ pip install flower

использование:

http://localhost:5555

Comments

    Ничего не найдено.