Как преобразовать байты из Кафки в их исходный объект?



Я извлекаю данные из Kafka, а затем десериализую Array[Byte], используя декодер по умолчанию, и после этого мои элементы RDD выглядят следующим образом (null,[B@406fa9b2), (null,[B@21a9fe0) но мне нужны мои исходные данные, которые имеют схему, так как я могу этого достичь?



Я сериализирую сообщения в формате Avro.

572   1  

1 ответ:

Вы должны декодировать байты, используя соответствующие десериализаторы, скажем, для строк или вашего пользовательского объекта.

Если вы не выполняете декодирование, вы получаете [B@406fa9b2, то есть просто текстовое представление байтовых массивов в Java.

Кафка ничего не знает о содержании ваших сообщений и поэтому передает байтовые массивы от производителей к потребителям.

В потоковой передаче Spark вы должны использовать сериализаторы для ключей и значений (цитируя пример KafkaWordCount):

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

С помощью выше сериализаторов вы получаете DStream[String], поэтому вы работаете с RDD[String].

Однако, если вы хотите десериализовать байтовые массивы в свой пользовательский класс напрямую, вам придется написать пользовательский сериализатор (который является специфичным для Кафки и не имеет ничего общего с Spark). Я бы рекомендовал использовать JSON с фиксированной схемой или Avro (с решением, описанным вKafka, Spark и Avro - Part 3, производя и потребляя сообщения Avro ).

В Структурированной Потоковой Передаче однако конвейер может выглядеть следующим образом:

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string") // <-- conversion here

Comments

    Ничего не найдено.