Как преобразовать байты из Кафки в их исходный объект?
Я извлекаю данные из Kafka, а затем десериализую Array[Byte], используя декодер по умолчанию, и после этого мои элементы RDD выглядят следующим образом (null,[B@406fa9b2), (null,[B@21a9fe0) но мне нужны мои исходные данные, которые имеют схему, так как я могу этого достичь?
Я сериализирую сообщения в формате Avro.
1 ответ:
Вы должны декодировать байты, используя соответствующие десериализаторы, скажем, для строк или вашего пользовательского объекта.
Если вы не выполняете декодирование, вы получаете
Кафка ничего не знает о содержании ваших сообщений и поэтому передает байтовые массивы от производителей к потребителям.[B@406fa9b2, то есть просто текстовое представление байтовых массивов в Java.В потоковой передаче Spark вы должны использовать сериализаторы для ключей и значений (цитируя пример KafkaWordCount):
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")С помощью выше сериализаторов вы получаете
Однако, если вы хотите десериализовать байтовые массивы в свой пользовательский класс напрямую, вам придется написать пользовательский сериализатор (который является специфичным для Кафки и не имеет ничего общего с Spark). Я бы рекомендовал использовать JSON с фиксированной схемой или Avro (с решением, описанным вKafka, Spark и Avro - Part 3, производя и потребляя сообщения Avro ).DStream[String], поэтому вы работаете сRDD[String].
В Структурированной Потоковой Передаче однако конвейер может выглядеть следующим образом:
val fromKafka = spark. readStream. format("kafka"). option("subscribe", "topic1"). option("kafka.bootstrap.servers", "localhost:9092"). load. select('value cast "string") // <-- conversion here
Comments