Очистить Тему Кафки
Я нажал сообщение, которое было слишком большим в теме сообщения Кафки на моей локальной машине, теперь я получаю сообщение об ошибке:
kafka.common.InvalidMessageSizeException: invalid message size
увеличение fetch.size здесь не идеально, потому что я на самом деле не хочу принимать такие большие сообщения. Есть ли способ очистить тему в Кафке?
13 ответов:
временно обновите время хранения по теме до одной секунды:
kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000затем подождите, пока очистка вступит в силу (около одной минуты). После очистки восстановите предыдущее
retention.msзначение.
вот шаги, которые я выполняю, чтобы удалить тему с именем
MyTopic:
- остановите демона Apache Kafka
- удалить папку данных темы:
rm -rf /tmp/kafka-logs/MyTopic-0- удалить метаданные темы:
zkCli.shзатемrmr /brokers/MyTopic- запустите демон Apache Kafka
если вы пропустите шаг 3, то Apache Kafka будет продолжать сообщать о теме как о настоящем (например, если вы запустите
kafka-list-topic.sh).протестировано с Apache Кафка 0.8.0.
чтобы очистить очередь, вы можете удалить тему:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic testзатем воссоздать его:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 --topic test
хотя принятый ответ верен, этот метод устарел. Настройка темы теперь должна выполняться через
kafka-configs.kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopicконфигурации, установленные с помощью этого метода, могут отображаться с помощью команды
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
протестировано в Kafka 0.8.2, для примера быстрого запуска: Во-первых, добавьте одну строку на сервер.файл свойств в папке конфигурации:
delete.topic.enable=trueзатем можно выполнить следующую команду:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
обновление: этот ответ имеет отношение к Кафке 0.6. Для Кафки 0.8 и позже см. ответ @Patrick.
да, остановите kafka и вручную удалите все файлы из соответствующего подкаталога (его легко найти в каталоге данных kafka). После перезапуска Кафки тема будет пустой.
у Кафки нет прямого метода для очистки / очистки темы (очереди), но он может сделать это, удалив эту тему и воссоздав ее.
прежде всего убедитесь, что разорвать.свойства файла есть и если не добавить
delete.topic.enable=trueудалите тему
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopicзатем создайте его снова.
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
иногда, если у вас есть насыщенный кластер (слишком много разделов, или с использованием зашифрованных данных темы, или с использованием SSL, или контроллер находится на плохом узле, или соединение шелушится, потребуется много времени, чтобы очистить указанную тему.
Я выполняю следующие действия, особенно если вы используете Avro.
1: запуск с инструментами Кафки:
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>2: запуск на узле реестра схемы:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning3: заданная тема сохранение обратно к исходному параметру, как только тема пуста.
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>надеюсь, что это поможет кому-то, так как это не легко рекламируется.
самый простой подход заключается в том, чтобы установить дату отдельных файлов журнала, чтобы быть старше, чем срок хранения. Тогда брокер должен очистить их и удалить их для вас в течение нескольких секунд. Это дает несколько преимуществ:
- не нужно сбивать брокеров, это операция во время выполнения.
- позволяет избежать возможности недопустимых исключений смещения (подробнее об этом ниже).
по моему опыту с Кафкой 0.7.x, удаление файлов журнала и перезапуск брокера может привести к недопустимым исключениям смещения для некоторых потребителей. Это произойдет потому, что брокер перезапустит смещения на ноль (при отсутствии каких-либо существующих файлов журнала), а потребитель, который ранее потреблял из раздела, снова подключится, чтобы запросить определенное [когда-то действительное] смещение. Если это смещение выходит за пределы журналов новой темы, то никакого вреда не будет, и потребитель возобновит работу либо в начале, либо в конце. Но, если смещение попадает в пределы границы журналов нового раздела, брокер пытается извлечь набор сообщений, но не удается, потому что смещение не совпадает с фактическим сообщением.
Это можно было бы смягчить, также очистив потребительские смещения в zookeeper для этой темы. Но если вам не нужна девственная тема и просто хотите удалить существующее содержимое, то просто "прикоснуться" к нескольким журналам тем гораздо проще и надежнее, чем останавливать брокеров, удалять журналы тем и очищать определенные узлы zookeeper.
Совет Томаса велик, но к сожалению
zkCliв старых версиях Zookeeper (например 3.3.6), кажется, не поддерживаетrmr. Например, сравните реализацию командной строки в современный смотритель зоопарка С версия 3.3.если вы столкнулись со старой версией Zookeeper, одним из решений является использование клиентской библиотеки, такой как zc.ЗК для Python. Для людей, не знакомых с Python, вам нужно установить его с помощью Пип или easy_install. Затем запустите оболочку Python (
python), и вы можете сделать:import zc.zk zk = zc.zk.ZooKeeper('localhost:2181') zk.delete_recursive('brokers/MyTopic')или даже
zk.delete_recursive('brokers')если вы хотите удалить все темы от Кафки.
чтобы очистить все сообщения из определенной темы, используя вашу группу приложений (имя группы должно совпадать с именем группы приложения kafka).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
не удалось добавить в качестве комментария из-за размера: Не уверен, что это правда, кроме обновления retention.ms и удержание.но я заметил, что политика очистки темы должна быть "удалить" (по умолчанию), если "компактный", он будет дольше удерживать сообщения, т. е. если он "компактный", вы должны указать delete.retention.ms тоже.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1также пришлось контролировать самые ранние / последние смещения должны быть такими же, чтобы подтвердить, что это успешно произошло, также можно проверить du-h / tmp / kafka-logs / test-topic-3-100-*
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += } END {print sum}' 26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += } END {print sum}' 26599762другая проблема, вы должны получить текущая конфигурация первый таким образом, вы помните, чтобы вернуться после успешного удаления:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
другое, а ручной подход, для этого:
в брокеры:
- остановите брокера Кафки
sudo service kafka stop- удалить все файлы журналов раздела (должно быть сделано на всех брокеров)
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*в zookeeper:
- запустить интерфейс командной строки zookeeper
sudo /usr/lib/zookeeper/bin/zkCli.sh- используйте zkCli для удаления темы метаданные
rmr /brokers/topic/<some_topic_name>в брокеры снова:
- перезапустить брокер сервис
sudo service kafka start
Comments