Производительность программы Spark-GC & десериализация задач и параллельное выполнение
У меня есть кластер из 4 машин, 1 Мастер и три рабочих, каждый с памятью 128 г и 64 ядрами. Я использую Spark 1.5.0 в автономном режиме. Моя программа считывает данные из таблиц Oracle с помощью JDBC, затем делает ETL, манипулируя данными, и выполняет задачи машинного обучения, такие как k-means.
У меня есть фрейм данных (myDF.cache ()), который объединяет результаты с двумя другими фреймами данных и кэшируется. Фрейм данных содержит 27 миллионов строк, а размер данных составляет около 1,5 G. мне нужно отфильтровать данные и вычислите 24 гистограммы следующим образом:
val h1 = myDF.filter("pmod(idx, 24) = 0").select("col1").histogram(arrBucket)
val h2 = myDF.filter("pmod(idx, 24) = 1").select("col1").histogram(arrBucket)
// ......
val h24 = myDF.filter("pmod(idx, 24) = 23").select("col1").histogram(arrBucket)
Задачи:
Поскольку мой фрейм данных кэшируется, я ожидаю, что фильтр, выбор и гистограмма будут очень быстрыми. Однако фактическое время составляет около 7 секунд для каждого расчета, что неприемлемо. Из пользовательского интерфейса он показывает, что время GC занимает 5 секунд, а время десериализации задачи-4 секунды. Я пробовал разные параметры JVM, но не могу улучшить дальше. Прямо сейчас я использую
-Xms25G -Xmx25G -XX:MaxPermSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=200
-XX:ParallelGCThreads=32
-XX:ConcGCThreads=8 -XX:InitiatingHeapOccupancyPercent=70
Что меня озадачивает, так это то, что размер данных-ничто по сравнению с доступной памятью. Почему GC срабатывает каждый раз, когда работает фильтр / выбор / гистограмма? Есть ли способ сократить время GC и время десериализации задачи?
Я должен делать параллельные вычисления для h[1-24], а не последовательные. Я попробовал Future, что-то вроде:
import scala.concurrent.{Await, Future, blocking}
import scala.concurrent.ExecutionContext.Implicits.global
val f1 = Future{myDF.filter("pmod(idx, 24) = 1").count}
val f2 = Future{myDF.filter("pmod(idx, 24) = 2").count}
val f3 = Future{myDF.filter("pmod(idx, 24) = 3").count}
val future = for {c1 <- f1; c2 <- f2; c3 <- f3} yield {
c1 + c2 + c3
}
val summ = Await.result(future, 180 second)
Проблема в том, что здесь будущее означает только то, что задания передаются планировщику почти одновременно, а не то, что они в конечном итоге планируются и выполняются одновременно. Будущее, используемое здесь, не улучшает производительность вообще.
Как заставить 24 вычислительных задания выполняться одновременно?
1 ответ:
Несколько вещей, которые вы можете попробовать:
Не вычисляйте
pmod(idx, 24)снова и снова. Вместо этого вы можете просто вычислить его один раз:import org.apache.spark.sql.functions.{pmod, lit} val myDfWithBuckets = myDF.withColumn("bucket", pmod($"idx", lit(24)))Используйте
SQLContext.cacheTableвместоcache. Он хранит таблицу с использованием сжатого колоночного хранилища, которое может использоваться для доступа только к необходимым столбцам и, как указано в руководстве Spark SQL and DataFrame "автоматически настроит сжатие, чтобы минимизировать использование памяти и давление GC ".myDfWithBuckets.registerTempTable("myDfWithBuckets") sqlContext.cacheTable("myDfWithBuckets")Если вы ... может, кэшировать только те столбцы, которые вам действительно нужны, а не проецировать каждый раз.
Мне не ясно, что является источником метода
histogram(вы преобразуете вRDD[Double]и используетеDoubleRDDFunctions.histogram?) и каков аргумент, но если вы хотите вычислить все гистограммы одновременно, вы можете попробоватьgroupByведро и применить гистограмму один раз, например, используяhistogram_numericUDF:import org.apache.spark.sql.functions.callUDF val n: Int = ??? myDfWithBuckets .groupBy($"bucket") .agg(callUDF("histogram_numeric", $"col1", lit(n)))Если вы используете предопределенные диапазоны, вы можете получить аналогичный эффект, используя пользовательский ФУНКЦИЯ.
Примечания
Как извлечь значения, вычисленные по
histogram_numeric? Сначала давайте создадим небольшой помощникimport org.apache.spark.sql.Row def extractBuckets(xs: Seq[Row]): Seq[(Double, Double)] = xs.map(x => (x.getDouble(0), x.getDouble(1)))Теперь мы можем построить карту, используя сопоставление шаблонов следующим образом:
import org.apache.spark.rdd.RDD val histogramsRDD: RDD[(Int, Seq[(Double, Double)])] = histograms.map{ case Row(k: Int, hs: Seq[Row @unchecked]) => (k, extractBuckets(hs)) }
Comments