13 ответов:
Единственный способ, который приходит на ум для этого с точки зрения потребителя, - это фактически потреблять сообщения и подсчитывать их.
Брокер Kafka выставляет счетчики JMX для количества сообщений, полученных с момента запуска, но вы не можете знать, сколько из них уже было очищено.
В большинстве распространенных сценариев сообщения в Кафке лучше всего рассматривать как бесконечный поток, и получение дискретного значения количества, которое в настоящее время хранится на диске, не имеет значения. Кроме того вещи усложняйтесь, имея дело с кластером брокеров, которые все имеют подмножество сообщений в теме.
Это не java, но может быть полезно
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker>: <port> --topic <topic-name> --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
На самом деле я использую это для сравнительного анализа моего POC. Элемент, который вы хотите использовать ConsumerOffsetChecker. Вы можете запустить его с помощью скрипта bash, как показано ниже.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroupИ ниже результат :
Как вы можете видеть на Красном поле, 999 - это номер сообщения, находящегося в данный момент в теме.
Update: ConsumerOffsetChecker устарел с 0.10.0, вы можете начать использовать ConsumerGroupCommand.
Использовать https://prestodb.io/docs/current/connector/kafka-tutorial.html
Супер SQL движок, предоставляемый Facebook, который подключается к нескольким источникам данных (Cassandra, Kafka, JMX, Redis ...).
PrestoDB работает как сервер с дополнительными рабочими (есть автономный режим без дополнительных рабочих), а затем вы используете небольшой исполняемый JAR (называемый presto CLI) для выполнения запросов.После того, как вы хорошо настроили сервер Presto, вы можете использовать traditionnal SQL:
SELECT count(*) FROM TOPIC_NAME;
Чтобы получить все сообщения, сохраненные для темы, вы можете искать потребителя в начале и конце потока для каждой секции и суммировать результаты
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream() .map(p -> new TopicPartition(topic, p.partition())) .collect(Collectors.toList()); consumer.assign(partitions); consumer.seekToEnd(Collections.emptySet()); Map<TopicPartition, Long> endPartitions = partitions.stream() .collect(Collectors.toMap(Function.identity(), consumer::position)); consumer.seekToBeginning(Collections.emptySet()); System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
Команда Apache Kafka для получения необработанных сообщений по всем разделам темы:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_groupОтпечатки пальцев:
Колонка 6 - это необработанные сообщения. Сложите их вот так:Group Topic Pid Offset logSize Lag Owner test_group test 0 11051 11053 2 none test_group test 1 10810 10812 2 none test_group test 2 11027 11028 1 nonekafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} END {print sum}'Awk читает строки, пропускает строку заголовка и складывает 6-й столбец, а в конце выводит сумму.
Отпечатки
5
Иногда интерес представляет знание количества сообщений в каждом разделе, например, при тестировании пользовательского разделителя.Последующие шаги были протестированы для работы с Kafka 0.10.2.1-2 из Confluent 3.2. Учитывая тему Кафки,
ktи следующую командную строку:$ kafka-run-class kafka.tools.GetOffsetShell \ --broker-list host01:9092,host02:9092,host02:9092 --topic kt, который выводит образец вывода, показывающий количество сообщений в трех разделах:
kt:2:6138 kt:1:6123 kt:0:6137Количество строк может быть больше или меньше в зависимости от количества разделов для темы.
Используя Java-клиент Kafka 2.11-1.0.0, вы можете сделать следующее:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // after each message, query the number of messages of the topic Set<TopicPartition> partitions = consumer.assignment(); Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions); for(TopicPartition partition : offsets.keySet()) { System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition)); } } }Вывод примерно такой:
offset = 10, key = null, value = un partition test is at 13 offset = 11, key = null, value = deux partition test is at 13 offset = 12, key = null, value = trois partition test is at 13
./kafka-console-consumer.sh -из-начала-новый-потребитель --бутстреп-сервер yourbroker:9092 --свойства печати.ключ=правда-принт собственность.value=false -- свойство print.раздел --тема yourtopic --timeout-ms 5000 / tail-n 10 / grep "обработано всего"
Выдержки из документов Кафки
Отклонения в 0.9.0.0
Kafka-consumer-offset-checker.sh (Кафка.инструменты.ConsumerOffsetChecker) была признана устаревшей. Идя вперед, пожалуйста, используйте kafka-consumer-groups.sh (Кафка.администратор.ConsumerGroupCommand) для этой функциональности.
Я запускаю Kafka broker с поддержкой SSL как для сервера, так и для клиента. Ниже я использую команду
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_xГде /tmp / ssl_config - это как показано ниже
security.protocol=SSL ssl.truststore.location=truststore_file_path.jks ssl.truststore.password=truststore_password ssl.keystore.location=keystore_file_path.jks ssl.keystore.password=keystore_password ssl.key.password=key_password
Если у вас есть доступ к интерфейсу JMX сервера, начальные и конечные смещения присутствуют по адресу:
kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER(вам нужно заменить
В качестве альтернативы можно использовать методыпотребителя КафкиTOPICNAME&PARTITIONNUMBER). Имейте в виду, что вам нужно проверить каждую из реплик данного раздела, или вам нужно выяснить, какой из брокеров является лидером для данного раздела (и это может измениться со временем).beginningOffsetsиendOffsets.


Comments