Кэширование временных таблиц с помощью spark-sql



Является таблицей, зарегистрированной с помощью registerTempTable (createOrReplaceTempView с искрой 2.+) кэшируется?



Используя Zeppelin, я регистрирую DataFrame в своем коде scala, после тяжелых вычислений, а затем в %pyspark я хочу получить к нему доступ и далее фильтровать его.



Будет ли он использовать кэшированную в памяти версию таблицы? Или он будет перестраиваться каждый раз?

727   2  

2 ответов:

Зарегистрированные таблицы не кэшируются в памяти.

registerTempTable createOrReplaceTempView метод просто создаст или заменит представление данного DataFrame с заданным планом запроса.

Он преобразует план запроса в каноническую строку SQL и сохраняет его в виде текста представления в метасторе, если нам нужно создать постоянное представление.

Вам нужно будет явно кэшировать фрейм данных. например:

df.createOrReplaceTempView("my_table") # df.registerTempTable("my_table") for spark <2.+
spark.cacheTable("my_table") 

Редактировать:

Проиллюстрируем это примером :

Использование cacheTable :

scala> val df = Seq(("1",2),("b",3)).toDF
// df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> sc.getPersistentRDDs
// res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.createOrReplaceTempView("my_table")

scala> sc.getPersistentRDDs
// res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> spark.cacheTable("my_table")

scala> sc.getPersistentRDDs
// res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(2 -> In-memory table my_table MapPartitionsRDD[2] at cacheTable at <console>:26)

Теперь тот же пример с использованием cache.registerTempTable cache.createOrReplaceTempView :

scala> sc.getPersistentRDDs
// res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> val df = Seq(("1",2),("b",3)).toDF
// df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.createOrReplaceTempView("my_table")

scala> sc.getPersistentRDDs
// res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.cache.registerTempTable("my_table")

scala> sc.getPersistentRDDs
// res6: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = 
// Map(2 -> ConvertToUnsafe
// +- LocalTableScan [_1#0,_2#1], [[1,2],[b,3]]
//  MapPartitionsRDD[2] at cache at <console>:28)

Это не так. Вы должны кэшировать явно:

sqlContext.cacheTable("someTable")

Comments

    Ничего не найдено.