Удаление всех отложенных задач в celery / rabbitmq



Как я могу удалить все отложенные задачи, не зная task_id для каждой задачи?

1287   7  

7 ответов:

С docs:

$ celery -A proj purge

или

from proj.celery import app
app.control.purge()

(EDIT: обновлено с помощью текущего метода.)

для сельдерея 3.0+:

$ celery purge

для очистки определенной очереди:

$ celery -Q queue_name purge

Для Сельдерея 2.x и 3.x:

при использовании параметра worker with-Q для определения очередей, например

celery worker -Q queue1,queue2,queue3

затем celery purge не будет работать, потому что вы не можете передать ему параметры очереди. Это приведет только к удалению очереди по умолчанию. Решение состоит в том, чтобы начать своих работников с

В Сельдерее 3+:

CLI:

$ celery -A proj purge

программно:

>>> from proj.celery import app
>>> app.control.purge()

http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks

Я нашел это celery purge не работает для моей более сложной конфигурации сельдерея. Я использую несколько именованных очередей для разных целей:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
[email protected]                      0   1
[email protected]                      0   1
apns                                            0   1
[email protected]                        0   1
analytics                                       1   1
[email protected]                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

первый столбец-это имя очереди, второй-количество сообщений, ожидающих в очереди, а третий-количество слушателей для этой очереди. Очереди следующие:

  • сельдерей-очередь для стандартных, идемпотентных задач сельдерея
  • apns-очередь для Apple Push Notification Service задач, не совсем так идемпотент
  • analytics-очередь для длительной ночной аналитики
  • *.pidbox-очередь для рабочих команд, таких как завершение работы и сброс, по одному на одного работника (2 работника сельдерея, один работник apns, один работник аналитики)
  • bcast.* - Широковещательные очереди, для отправки сообщений всем работникам, слушающим очередь (а не только первым, чтобы захватить его)
  • celeryev.* - Очереди событий сельдерея, для отчетности аналитика задач

в задача аналитики-это задачи грубой силы, которые отлично работали на небольших наборах данных, но теперь требуется более 24 часов для обработки. Иногда что-то пойдет не так, и он застрянет в ожидании в базе данных. Его нужно переписать, но до тех пор, когда он застрянет, я убиваю задачу, очищаю очередь и повторяю попытку. Я обнаруживаю "тупик", глядя на количество сообщений для очереди аналитики, которое должно быть 0 (законченная аналитика) или 1 (ожидание завершения аналитики прошлой ночи). 2 или выше-это плохо, и я получаю электронное письмо.

celery purge предлагает удалить задачи из одной из широковещательных очередей, и я не вижу возможности выбрать другую именованную очередь.

вот мой процесс:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start

В Сельдерее 3+

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

очистить очередь с именем:

 celery -A proj amqp queue.purge <queue name>

очистить настроен очередь

celery -A proj purge

я очистил сообщения, но в очереди все еще остаются сообщения? Ответ: задачи признаются (удаляются из очереди), как только они фактически выполняются. После того, как работник получил задача, это займет некоторое время, пока он не будет фактически выполнен, особенно если есть много задач, уже ожидающих выполнения. Сообщения, которые не подтверждены, удерживаются работником до тех пор, пока он не закроет соединение с брокером (сервером AMQP). Когда это соединение будет закрыто (например, потому что рабочий был остановлен), задачи будут повторно отправлены брокером следующему доступному рабочему (или тому же рабочему, когда он был перезапущен), поэтому для правильной очистки очереди ожидающих задач у вас есть чтобы остановить всех рабочих, а затем очистить задачи с помощью сельдерея.управление.чистка.)(

таким образом, чтобы очистить всю очередь работников должны быть остановлены.

1. Чтобы правильно очистить очередь от ожидающих заданий вы должны остановить всех работников (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):

$ sudo rabbitmqctl stop

или (в случае, если брокер RabbitMQ/message управляется супервизором):

$ sudo supervisorctl stop all

2. ...а затем очистить задачи из определенной очереди:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. Начало В RabbitMQ:

$ sudo rabbitmqctl start

или (в случае, если RabbitMQ управляется супервайзером):

$ sudo supervisorctl start all

Comments

    Ничего не найдено.