apache-kafka- все статьи тега ➜ страница 0
Как преобразовать байты из Кафки в их исходный объект?
Я извлекаю данные из Kafka, а затем десериализую Array[Byte], используя декодер по умолчанию, и после этого мои элементы RDD выглядят следующим образом (null,[B@406fa9b2), (null,[B@21a9fe0) но мне нужны мои исходные данные, которые имеют схему, так как я могу этого достичь? Я сериализирую сообщения в формате Avro. ...
Как добиться отсроченной очереди с помощью apache kafka?
Как добавить отложенные задания на kafka? Как я понимаю, он имеет дело не с каждым сообщением, а с каждой темой. Мои рабочие места имеют различный график, в котором я хотел бы их потреблять. Скажем, один будет в ближайшие 4 часа, другой будет I Dec. 1 и т. д. Есть ли у Кафки собственная поддержка для этого или других сторонних способов достижения того же самого? Вместо этого я думаю использовать Redis для отложенной очереди и отправить задание Кафке, как только его расписание будет получено, н ...
Кафка непризнанный вариант виртуальной машины "PrintGCDateStamps"
Я установил Kafka на удаленном сервере, и когда я попытался запустить ~/kafka/bin/zookeeper-server-start.sh ~/kafka/config/zookeeper.properties И я получил ошибку Unrecognized VM option 'PrintGCDateStamps' И сервер Кафки не запустился. Это не запускалось в виртуальной машине, а запускалось непосредственно на Ubuntu Server 16.04 с правильно установленной Java. Как это можно просто исправить? ...
Определение IP-адреса в контейнере docker
У меня есть файл docker-compose с несколькими определениями контейнеров-служб. Одна из служб взаимодействует с Apache Kafka в рамках одного и того же запуска docker-compose. Итак, у меня есть такое определение Кафки докера: kafka: image: spotify/kafka ports: - "2181:2181" - "9092:9092" environment: ADVERTISED_HOST: 127.0.0.1 ADVERTISED_PORT: 9092 У меня есть определение службы в том же файле docker-compose. В сценарии запуска сервиса я должен каким-то о ...
Проблема с памятью при запуске Kafka broker
Я новичок в технологиях Кафки и Хадупа. Я пытался установить и запустить свой первый одиночный узел, кластер Single Broker на экземпляре виртуальной машины AWS EC2, я закончил с: 1) Установка java 2) Обновление ~/.bashrc and ~/.nash_profile файлов с записями, связанными с java 3) Возможность запуска внутреннего экземпляра zookeeper, но 4) Как только я пытаюсь запустить Kafka broker, он выдает мне следующее сообщение об ошибке: $ bin/kafka-server-start.sh config/server.properties Java H ...
Избыточные консольные сообщения от производителя Кафки
Как вы контролируете уровень регистрации консоли производителя или потребителя Кафки? Я использую API Kafka 0.9 в Scala. Каждый раз, когда send на KafkaProducer вызывается, консоль выдает вывод, как показано ниже. Может ли это указывать на то, что я не правильно настроил KafkaProducer, а не просто на проблему избыточного протоколирования? 17:52:21.236 [pool-10-thread-7] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig values: compression.type = none metric.reporters = [] metadata.max. ...
Java, как получить количество сообщений в теме в apache kafka
Я использую apache kafka для обмена сообщениями. Я реализовал производителя и потребителя в Java. Как мы можем получить количество сообщений в теме? ...
Очистить Тему Кафки
Я нажал сообщение, которое было слишком большим в теме сообщения Кафки на моей локальной машине, теперь я получаю сообщение об ошибке: kafka.common.InvalidMessageSizeException: invalid message size увеличение fetch.size здесь не идеально, потому что я на самом деле не хочу принимать такие большие сообщения. Есть ли способ очистить тему в Кафке? ...
В Apache Кафка против Апачи шторм
Apache Kafka: распределенная система обмена сообщениями Apache Storm: Обработка Сообщений В Реальном Времени Как мы можем использовать обе технологии в конвейере данных в реальном времени для обработки данных о событиях? с точки зрения конвейера данных в реальном времени оба, как мне кажется, выполняют одинаковую работу. Как мы можем использовать обе технологии на конвейере данных? ...