Java, как получить количество сообщений в теме в apache kafka



Я использую apache kafka для обмена сообщениями. Я реализовал производителя и потребителя в Java. Как мы можем получить количество сообщений в теме?

760   13  

13 ответов:

Единственный способ, который приходит на ум для этого с точки зрения потребителя, - это фактически потреблять сообщения и подсчитывать их.

Брокер Kafka выставляет счетчики JMX для количества сообщений, полученных с момента запуска, но вы не можете знать, сколько из них уже было очищено.

В большинстве распространенных сценариев сообщения в Кафке лучше всего рассматривать как бесконечный поток, и получение дискретного значения количества, которое в настоящее время хранится на диске, не имеет значения. Кроме того вещи усложняйтесь, имея дело с кластером брокеров, которые все имеют подмножество сообщений в теме.

Это не java, но может быть полезно

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 
  | awk -F  ":" '{sum += $3} END {print sum}'

На самом деле я использую это для сравнительного анализа моего POC. Элемент, который вы хотите использовать ConsumerOffsetChecker. Вы можете запустить его с помощью скрипта bash, как показано ниже.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

И ниже результат : Введите описание изображения здесь Как вы можете видеть на Красном поле, 999 - это номер сообщения, находящегося в данный момент в теме.

Update: ConsumerOffsetChecker устарел с 0.10.0, вы можете начать использовать ConsumerGroupCommand.

Использовать https://prestodb.io/docs/current/connector/kafka-tutorial.html

Супер SQL движок, предоставляемый Facebook, который подключается к нескольким источникам данных (Cassandra, Kafka, JMX, Redis ...).

PrestoDB работает как сервер с дополнительными рабочими (есть автономный режим без дополнительных рабочих), а затем вы используете небольшой исполняемый JAR (называемый presto CLI) для выполнения запросов.

После того, как вы хорошо настроили сервер Presto, вы можете использовать traditionnal SQL:

SELECT count(*) FROM TOPIC_NAME;

Чтобы получить все сообщения, сохраненные для темы, вы можете искать потребителя в начале и конце потока для каждой секции и суммировать результаты

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());

Команда Apache Kafka для получения необработанных сообщений по всем разделам темы:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

Отпечатки пальцев:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none
Колонка 6 - это необработанные сообщения. Сложите их вот так:
kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

Awk читает строки, пропускает строку заголовка и складывает 6-й столбец, а в конце выводит сумму.

Отпечатки

5

В самых последних версиях Kafka Manager есть столбец под названием суммированные последние смещения.

Введите описание изображения здесь

Иногда интерес представляет знание количества сообщений в каждом разделе, например, при тестировании пользовательского разделителя.Последующие шаги были протестированы для работы с Kafka 0.10.2.1-2 из Confluent 3.2. Учитывая тему Кафки, kt и следующую командную строку:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

, который выводит образец вывода, показывающий количество сообщений в трех разделах:

kt:2:6138
kt:1:6123
kt:0:6137

Количество строк может быть больше или меньше в зависимости от количества разделов для темы.

Я сам не пробовал это, но, кажется, это имеет смысл.

Вы также можете использовать kafka.tools.ConsumerOffsetChecker (Источник ).

Используя Java-клиент Kafka 2.11-1.0.0, вы можете сделать следующее:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

Вывод примерно такой:

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13

./kafka-console-consumer.sh -из-начала-новый-потребитель --бутстреп-сервер yourbroker:9092 --свойства печати.ключ=правда-принт собственность.value=false -- свойство print.раздел --тема yourtopic --timeout-ms 5000 / tail-n 10 / grep "обработано всего"

Выдержки из документов Кафки

Отклонения в 0.9.0.0

Kafka-consumer-offset-checker.sh (Кафка.инструменты.ConsumerOffsetChecker) была признана устаревшей. Идя вперед, пожалуйста, используйте kafka-consumer-groups.sh (Кафка.администратор.ConsumerGroupCommand) для этой функциональности.

Я запускаю Kafka broker с поддержкой SSL как для сервера, так и для клиента. Ниже я использую команду

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

Где /tmp / ssl_config - это как показано ниже

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password

Если у вас есть доступ к интерфейсу JMX сервера, начальные и конечные смещения присутствуют по адресу:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(вам нужно заменить TOPICNAME & PARTITIONNUMBER). Имейте в виду, что вам нужно проверить каждую из реплик данного раздела, или вам нужно выяснить, какой из брокеров является лидером для данного раздела (и это может измениться со временем).

В качестве альтернативы можно использовать методыпотребителя Кафки beginningOffsets и endOffsets.

Comments

    Ничего не найдено.