7 ответов:
С docs:
$ celery -A proj purgeили
from proj.celery import app app.control.purge()(EDIT: обновлено с помощью текущего метода.)
Для Сельдерея 2.x и 3.x:
при использовании параметра worker with-Q для определения очередей, например
celery worker -Q queue1,queue2,queue3затем
celery purgeне будет работать, потому что вы не можете передать ему параметры очереди. Это приведет только к удалению очереди по умолчанию. Решение состоит в том, чтобы начать своих работников с
В Сельдерее 3+:
CLI:
$ celery -A proj purgeпрограммно:
>>> from proj.celery import app >>> app.control.purge()http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks
Я нашел это
celery purgeне работает для моей более сложной конфигурации сельдерея. Я использую несколько именованных очередей для разных целей:$ sudo rabbitmqctl list_queues -p celery name messages consumers Listing queues ... # Output sorted, whitespaced for readability celery 0 2 [email protected] 0 1 [email protected] 0 1 apns 0 1 [email protected] 0 1 analytics 1 1 [email protected] 0 1 bcast.361093f1-de68-46c5-adff-d49ea8f164c0 0 1 bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1 0 1 celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54 0 1 celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866 0 1 celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99 0 1 celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e 0 1первый столбец-это имя очереди, второй-количество сообщений, ожидающих в очереди, а третий-количество слушателей для этой очереди. Очереди следующие:
- сельдерей-очередь для стандартных, идемпотентных задач сельдерея
- apns-очередь для Apple Push Notification Service задач, не совсем так идемпотент
- analytics-очередь для длительной ночной аналитики
- *.pidbox-очередь для рабочих команд, таких как завершение работы и сброс, по одному на одного работника (2 работника сельдерея, один работник apns, один работник аналитики)
- bcast.* - Широковещательные очереди, для отправки сообщений всем работникам, слушающим очередь (а не только первым, чтобы захватить его)
- celeryev.* - Очереди событий сельдерея, для отчетности аналитика задач
в задача аналитики-это задачи грубой силы, которые отлично работали на небольших наборах данных, но теперь требуется более 24 часов для обработки. Иногда что-то пойдет не так, и он застрянет в ожидании в базе данных. Его нужно переписать, но до тех пор, когда он застрянет, я убиваю задачу, очищаю очередь и повторяю попытку. Я обнаруживаю "тупик", глядя на количество сообщений для очереди аналитики, которое должно быть 0 (законченная аналитика) или 1 (ожидание завершения аналитики прошлой ночи). 2 или выше-это плохо, и я получаю электронное письмо.
celery purgeпредлагает удалить задачи из одной из широковещательных очередей, и я не вижу возможности выбрать другую именованную очередь.вот мой процесс:
$ sudo /etc/init.d/celeryd stop # Wait for analytics task to be last one, Ctrl-C $ ps -ef | grep analytics # Get the PID of the worker, not the root PID reported by celery $ sudo kill <PID> $ sudo /etc/init.d/celeryd stop # Confim dead $ python manage.py celery amqp queue.purge analytics $ sudo rabbitmqctl list_queues -p celery name messages consumers # Confirm messages is 0 $ sudo /etc/init.d/celeryd start
В Сельдерее 3+
http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks
CLI
очистить очередь с именем:
celery -A proj amqp queue.purge <queue name>очистить настроен очередь
celery -A proj purgeя очистил сообщения, но в очереди все еще остаются сообщения? Ответ: задачи признаются (удаляются из очереди), как только они фактически выполняются. После того, как работник получил задача, это займет некоторое время, пока он не будет фактически выполнен, особенно если есть много задач, уже ожидающих выполнения. Сообщения, которые не подтверждены, удерживаются работником до тех пор, пока он не закроет соединение с брокером (сервером AMQP). Когда это соединение будет закрыто (например, потому что рабочий был остановлен), задачи будут повторно отправлены брокером следующему доступному рабочему (или тому же рабочему, когда он был перезапущен), поэтому для правильной очистки очереди ожидающих задач у вас есть чтобы остановить всех рабочих, а затем очистить задачи с помощью сельдерея.управление.чистка.)(
таким образом, чтобы очистить всю очередь работников должны быть остановлены.
1. Чтобы правильно очистить очередь от ожидающих заданий вы должны остановить всех работников (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):
$ sudo rabbitmqctl stopили (в случае, если брокер RabbitMQ/message управляется супервизором):
$ sudo supervisorctl stop all2. ...а затем очистить задачи из определенной очереди:
$ cd <source_dir> $ celery amqp queue.purge <queue name>3. Начало В RabbitMQ:
$ sudo rabbitmqctl startили (в случае, если RabbitMQ управляется супервайзером):
$ sudo supervisorctl start all
Comments