Производительность программы Spark-GC & десериализация задач и параллельное выполнение



У меня есть кластер из 4 машин, 1 Мастер и три рабочих, каждый с памятью 128 г и 64 ядрами. Я использую Spark 1.5.0 в автономном режиме. Моя программа считывает данные из таблиц Oracle с помощью JDBC, затем делает ETL, манипулируя данными, и выполняет задачи машинного обучения, такие как k-means.



У меня есть фрейм данных (myDF.cache ()), который объединяет результаты с двумя другими фреймами данных и кэшируется. Фрейм данных содержит 27 миллионов строк, а размер данных составляет около 1,5 G. мне нужно отфильтровать данные и вычислите 24 гистограммы следующим образом:



val h1 = myDF.filter("pmod(idx, 24) = 0").select("col1").histogram(arrBucket) 
val h2 = myDF.filter("pmod(idx, 24) = 1").select("col1").histogram(arrBucket)
// ......
val h24 = myDF.filter("pmod(idx, 24) = 23").select("col1").histogram(arrBucket)


Задачи:





  1. Поскольку мой фрейм данных кэшируется, я ожидаю, что фильтр, выбор и гистограмма будут очень быстрыми. Однако фактическое время составляет около 7 секунд для каждого расчета, что неприемлемо. Из пользовательского интерфейса он показывает, что время GC занимает 5 секунд, а время десериализации задачи-4 секунды. Я пробовал разные параметры JVM, но не могу улучшить дальше. Прямо сейчас я использую



    -Xms25G -Xmx25G -XX:MaxPermSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 
    -XX:ParallelGCThreads=32
    -XX:ConcGCThreads=8 -XX:InitiatingHeapOccupancyPercent=70



Что меня озадачивает, так это то, что размер данных-ничто по сравнению с доступной памятью. Почему GC срабатывает каждый раз, когда работает фильтр / выбор / гистограмма? Есть ли способ сократить время GC и время десериализации задачи?





  1. Я должен делать параллельные вычисления для h[1-24], а не последовательные. Я попробовал Future, что-то вроде:



    import scala.concurrent.{Await, Future, blocking} 

    import scala.concurrent.ExecutionContext.Implicits.global

    val f1 = Future{myDF.filter("pmod(idx, 24) = 1").count}
    val f2 = Future{myDF.filter("pmod(idx, 24) = 2").count}
    val f3 = Future{myDF.filter("pmod(idx, 24) = 3").count}

    val future = for {c1 <- f1; c2 <- f2; c3 <- f3} yield {
    c1 + c2 + c3
    }

    val summ = Await.result(future, 180 second)



Проблема в том, что здесь будущее означает только то, что задания передаются планировщику почти одновременно, а не то, что они в конечном итоге планируются и выполняются одновременно. Будущее, используемое здесь, не улучшает производительность вообще.



Как заставить 24 вычислительных задания выполняться одновременно?

627   1  

1 ответ:

Несколько вещей, которые вы можете попробовать:

  1. Не вычисляйте pmod(idx, 24) снова и снова. Вместо этого вы можете просто вычислить его один раз:

    import org.apache.spark.sql.functions.{pmod, lit}
    
    val myDfWithBuckets = myDF.withColumn("bucket", pmod($"idx", lit(24)))
    
  2. Используйте SQLContext.cacheTable вместо cache. Он хранит таблицу с использованием сжатого колоночного хранилища, которое может использоваться для доступа только к необходимым столбцам и, как указано в руководстве Spark SQL and DataFrame "автоматически настроит сжатие, чтобы минимизировать использование памяти и давление GC ".

    myDfWithBuckets.registerTempTable("myDfWithBuckets")
    sqlContext.cacheTable("myDfWithBuckets")
    
  3. Если вы ... может, кэшировать только те столбцы, которые вам действительно нужны, а не проецировать каждый раз.

  4. Мне не ясно, что является источником метода histogram (вы преобразуете в RDD[Double] и используете DoubleRDDFunctions.histogram?) и каков аргумент, но если вы хотите вычислить все гистограммы одновременно, вы можете попробовать groupBy ведро и применить гистограмму один раз, например, используя histogram_numeric UDF:

    import org.apache.spark.sql.functions.callUDF
    
    val n: Int = ???
    
    myDfWithBuckets
      .groupBy($"bucket")
      .agg(callUDF("histogram_numeric", $"col1", lit(n)))
    

    Если вы используете предопределенные диапазоны, вы можете получить аналогичный эффект, используя пользовательский ФУНКЦИЯ.

Примечания

  • Как извлечь значения, вычисленные по histogram_numeric? Сначала давайте создадим небольшой помощник

    import org.apache.spark.sql.Row
    
    def extractBuckets(xs: Seq[Row]): Seq[(Double, Double)] =
      xs.map(x => (x.getDouble(0), x.getDouble(1)))
    

    Теперь мы можем построить карту, используя сопоставление шаблонов следующим образом:

    import org.apache.spark.rdd.RDD
    
    val histogramsRDD: RDD[(Int, Seq[(Double, Double)])] = histograms.map{
      case Row(k: Int, hs: Seq[Row @unchecked]) => (k, extractBuckets(hs)) }
    

Comments

    Ничего не найдено.