Очистить Тему Кафки



Я нажал сообщение, которое было слишком большим в теме сообщения Кафки на моей локальной машине, теперь я получаю сообщение об ошибке:



kafka.common.InvalidMessageSizeException: invalid message size


увеличение fetch.size здесь не идеально, потому что я на самом деле не хочу принимать такие большие сообщения. Есть ли способ очистить тему в Кафке?

865   13  

13 ответов:

временно обновите время хранения по теме до одной секунды:

kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000

затем подождите, пока очистка вступит в силу (около одной минуты). После очистки восстановите предыдущее retention.ms значение.

вот шаги, которые я выполняю, чтобы удалить тему с именем MyTopic:

  1. остановите демона Apache Kafka
  2. удалить папку данных темы:rm -rf /tmp/kafka-logs/MyTopic-0
  3. удалить метаданные темы:zkCli.sh затем rmr /brokers/MyTopic
  4. запустите демон Apache Kafka

если вы пропустите шаг 3, то Apache Kafka будет продолжать сообщать о теме как о настоящем (например, если вы запустите kafka-list-topic.sh).

протестировано с Apache Кафка 0.8.0.

чтобы очистить очередь, вы можете удалить тему:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

затем воссоздать его:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test

хотя принятый ответ верен, этот метод устарел. Настройка темы теперь должна выполняться через kafka-configs.

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

конфигурации, установленные с помощью этого метода, могут отображаться с помощью команды

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic

протестировано в Kafka 0.8.2, для примера быстрого запуска: Во-первых, добавьте одну строку на сервер.файл свойств в папке конфигурации:

delete.topic.enable=true

затем можно выполнить следующую команду:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

обновление: этот ответ имеет отношение к Кафке 0.6. Для Кафки 0.8 и позже см. ответ @Patrick.

да, остановите kafka и вручную удалите все файлы из соответствующего подкаталога (его легко найти в каталоге данных kafka). После перезапуска Кафки тема будет пустой.

у Кафки нет прямого метода для очистки / очистки темы (очереди), но он может сделать это, удалив эту тему и воссоздав ее.

прежде всего убедитесь, что разорвать.свойства файла есть и если не добавить delete.topic.enable=true

удалите тему bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

затем создайте его снова.

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2

иногда, если у вас есть насыщенный кластер (слишком много разделов, или с использованием зашифрованных данных темы, или с использованием SSL, или контроллер находится на плохом узле, или соединение шелушится, потребуется много времени, чтобы очистить указанную тему.

Я выполняю следующие действия, особенно если вы используете Avro.

1: запуск с инструментами Кафки:

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2: запуск на узле реестра схемы:

kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3: заданная тема сохранение обратно к исходному параметру, как только тема пуста.

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

надеюсь, что это поможет кому-то, так как это не легко рекламируется.

самый простой подход заключается в том, чтобы установить дату отдельных файлов журнала, чтобы быть старше, чем срок хранения. Тогда брокер должен очистить их и удалить их для вас в течение нескольких секунд. Это дает несколько преимуществ:

  1. не нужно сбивать брокеров, это операция во время выполнения.
  2. позволяет избежать возможности недопустимых исключений смещения (подробнее об этом ниже).

по моему опыту с Кафкой 0.7.x, удаление файлов журнала и перезапуск брокера может привести к недопустимым исключениям смещения для некоторых потребителей. Это произойдет потому, что брокер перезапустит смещения на ноль (при отсутствии каких-либо существующих файлов журнала), а потребитель, который ранее потреблял из раздела, снова подключится, чтобы запросить определенное [когда-то действительное] смещение. Если это смещение выходит за пределы журналов новой темы, то никакого вреда не будет, и потребитель возобновит работу либо в начале, либо в конце. Но, если смещение попадает в пределы границы журналов нового раздела, брокер пытается извлечь набор сообщений, но не удается, потому что смещение не совпадает с фактическим сообщением.

Это можно было бы смягчить, также очистив потребительские смещения в zookeeper для этой темы. Но если вам не нужна девственная тема и просто хотите удалить существующее содержимое, то просто "прикоснуться" к нескольким журналам тем гораздо проще и надежнее, чем останавливать брокеров, удалять журналы тем и очищать определенные узлы zookeeper.

Совет Томаса велик, но к сожалению zkCli в старых версиях Zookeeper (например 3.3.6), кажется, не поддерживает rmr. Например, сравните реализацию командной строки в современный смотритель зоопарка С версия 3.3.

если вы столкнулись со старой версией Zookeeper, одним из решений является использование клиентской библиотеки, такой как zc.ЗК для Python. Для людей, не знакомых с Python, вам нужно установить его с помощью Пип или easy_install. Затем запустите оболочку Python (python), и вы можете сделать:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

или даже

zk.delete_recursive('brokers')

если вы хотите удалить все темы от Кафки.

чтобы очистить все сообщения из определенной темы, используя вашу группу приложений (имя группы должно совпадать с именем группы приложения kafka).

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

не удалось добавить в качестве комментария из-за размера: Не уверен, что это правда, кроме обновления retention.ms и удержание.но я заметил, что политика очистки темы должна быть "удалить" (по умолчанию), если "компактный", он будет дольше удерживать сообщения, т. е. если он "компактный", вы должны указать delete.retention.ms тоже.

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

также пришлось контролировать самые ранние / последние смещения должны быть такими же, чтобы подтвердить, что это успешно произошло, также можно проверить du-h / tmp / kafka-logs / test-topic-3-100-*

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += } END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += } END {print sum}' 26599762

другая проблема, вы должны получить текущая конфигурация первый таким образом, вы помните, чтобы вернуться после успешного удаления: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

другое, а ручной подход, для этого:

в брокеры:

  1. остановите брокера Кафки
    sudo service kafka stop
  2. удалить все файлы журналов раздела (должно быть сделано на всех брокеров)
    sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*

в zookeeper:

  1. запустить интерфейс командной строки zookeeper
    sudo /usr/lib/zookeeper/bin/zkCli.sh
  2. используйте zkCli для удаления темы метаданные
    rmr /brokers/topic/<some_topic_name>

в брокеры снова:

  1. перезапустить брокер сервис
    sudo service kafka start

Comments

    Ничего не найдено.