Есть ли хороший способ присоединиться к потоку в spark с пеленальным столиком?



Наша искровая среда:
DataBricks 4.2 (включает Apache Spark 2.3.1, Scala 2.11)



Чего мы пытаемся достичь:
Мы хотим обогатить потоковые данные некоторыми справочными данными, которые регулярно обновляются. Обогащение осуществляется путем соединения потока с эталонными данными.



Что мы реализовали:
Мы реализовали два искровых задания (jars):
Первый-это обновление таблицы Spark TEST_TABLE каждый час (назовем ее "справочными данными") с помощью .писать.режим (SaveMode.Переписывать).saveAsTable ("TEST_TABLE")
А потом позвонил Искре.каталог.refreshTable ("TEST_TABLE")



Вторая задача (назовем ее потоковой передачей данных) заключается в использовании структурированной потоковой передачи Spark для потокового считывания некоторых данных, соединяя их с помощью DataFrame.transform() с таблицей TEST_TABLE и записью ее в другую систему.
Мы читаем справочные данные с помощью spark.читать.таблица ("TEST_TABLE") в вызываемой функции.transform () таким образом, мы получаем последние значения в таблице. К сожалению, второе приложение аварийно завершает работу каждый раз, когда первое приложение обновляет таблицу. На выходе Log4j выводится следующее сообщение:



18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748


Мы также попытались сделать недействительным кэш, прежде чем читать таблицу, но это снизило производительность, и приложение все равно рухнуло.
Мы подозреваем, что корневой курс-это ленивая оценка эталонного набора данных (который все еще "указывает" на старые данные, которых больше нет).



Есть ли у вас какие-либо предложения, что мы могли бы сделать, чтобы предотвратить это проблема или каков наилучший подход для объединения потока с динамическими справочными данными?

656   2  

2 ответов:

Присоединяйтесь к ссылочным данным; не кэшируйте их, это гарантирует, что вы перейдете к источнику. Ищите данные последней версии, которые обозначаются первичным ключом + счетчиком, где этот счетчик наиболее близок или равен счетчику, который вы поддерживаете в потоковом приложении. Каждый час записывайте, добавляйте все данные ref, которые все еще актуальны, снова, но с увеличенным счетчиком; т. е. новая версия. Используйте здесь паркет.

Вместо того, чтобы присоединиться к таблице и потоку. Вы можете воспользоваться новой функцией, доступной в spark 2.3.1, то есть объединением двух потоков данных. Создайте поток вместо таблицы с водяным знаком.

Watermarks: Watermarking in Structured Streaming is a way to limit state in all 
stateful streaming operations by specifying how much late data to consider. 
Specifically, a watermark is a moving threshold in event-time that trails behind the 
maximum event-time seen by the query in the processed data. The trailing gap (aka 
watermark delay) defines how long should the engine wait for late data to arrive and 
is specified in the query using withWatermark.

См. блог databricks

Comments

    Ничего не найдено.