СПАРК - передел() против объединиться()
Согласно учебной Искре
Имейте в виду, что перераспределение данных-довольно дорогостоящая операция.
Spark также имеет оптимизированную версию функции repartition () под названием coalesce (), которая позволяет избежать перемещения данных, но только при уменьшении числа разделов RDD.
Одно отличие, которое я получаю, состоит в том, что с помощью repartition () количество разделов может быть увеличено/уменьшено, но с coalesce () количество разделов может быть только уменьшившийся.
Если разделы распределены по нескольким машинам и выполняется функция coalesce (), как она может избежать перемещения данных?
7 ответов:
Он избегает полного перемешивания. Если известно, что число секций уменьшается, то исполнитель может безопасно хранить данные на минимальном количестве секций, только перемещая данные с дополнительных узлов на узлы, которые мы сохранили.
Итак, это будет выглядеть примерно так:
Node 1 = 1,2,3 Node 2 = 4,5,6 Node 3 = 7,8,9 Node 4 = 10,11,12Затем
coalesceвплоть до 2 разделов:Обратите внимание, что узлы 1 и 3 не требовали перемещения исходных данных.Node 1 = 1,2,3 + (10,11,12) Node 3 = 7,8,9 + (4,5,6)
Ответ Джастина потрясает, и этот ответ идет в более глубокое русло.
Алгоритм
repartitionделает полную перетасовку и создает новые разделы с данными, которые распределены равномерно. Давайте создадим фрейм данных с числами от 1 до 12.val x = (1 to 12).toList val numbersDf = x.toDF("number")
numbersDfсодержит 4 раздела на моей машине.numbersDf.rdd.partitions.size // => 4Вот как данные делятся на разделы:
Partition 00000: 1, 2, 3 Partition 00001: 4, 5, 6 Partition 00002: 7, 8, 9 Partition 00003: 10, 11, 12Давайте сделаем полную перетасовку с помощью метода
repartitionи получим эти данные на двух узлы.val numbersDfR = numbersDf.repartition(2)Вот как разделяются данные
numbersDfRна моей машине:Partition A: 1, 3, 4, 6, 7, 9, 10, 12 Partition B: 2, 5, 8, 11Метод
repartitionсоздает новые разделы и равномерно распределяет данные в новых разделах (распределение данных более равномерное для больших наборов данных).Разница между
coalesceиrepartition
coalesceиспользует существующие разделы, чтобы свести к минимуму объем перетасованных данных.repartitionсоздает новые разделы и делает полную перетасовку.coalesceприводит к разделам с различные объемы данных (иногда разделы, которые имеют очень разные размеры) иrepartitionприводят к разделам примерно одинакового размера.Это
coalesceилиrepartitionбыстрее?
coalesceможет работать быстрее, чемrepartition, но с разделами неравного размера обычно медленнее работать, чем с разделами равного размера. Обычно после фильтрации большого набора данных требуется перераспределение наборов данных. Я нашелrepartition, чтобы быть быстрее в целом, потому что Spark построен для работы с равными размерами перекрытия.Прочитайте этот пост в блоге , Если вам нужны еще более подробные сведения.
Один дополнительный момент, который следует отметить здесь, заключается в том, что в качестве основного принципа Spark RDD является неизменность. Перераспределение или слияние создаст новый RDD. Базовый RDD будет продолжать существовать со своим исходным числом разделов. В случае, если вариант использования требует сохранения RDD в кэше, то то же самое должно быть сделано для вновь созданного RDD.
scala> pairMrkt.repartition(10) res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26 scala> res16.partitions.length res17: Int = 10 scala> pairMrkt.partitions.length res20: Int = 2
Все ответы добавляют некоторые большие знания в этот очень часто задаваемый вопрос.
Итак, следуя традиции временной шкалы этого вопроса, вот мои 2 цента.Я обнаружил, чтоперераспределение происходит быстрее, чем слияние , в очень специфическом случае.
В моем приложении, когда количество файлов, которое мы оцениваем, меньше определенного порога, перераспределение работает быстрее.
Вот что я имею в виду
if(numFiles > 20) df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest) else df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)В приведенном выше фрагменте, если мои файлы были меньше чем 20, слияние занимало вечность, чтобы закончить, в то время как перераспределение было намного быстрее, и поэтому приведенный выше код.
Конечно, это число (20) будет зависеть от количества работников и объема данных. Надеюсь, это поможет.
Простым способом COALESCE: - это только для уменьшения количества разделов, без перетасовки данных он просто сжимает разделы
Перераспределение: - это как для увеличения, так и для уменьшения количества разделов, но перетасовка имеет место
Пример:-
val rdd = sc.textFile("path",7) rdd.repartition(10) rdd.repartition(2)Оба прекрасно работают
Но мы обычно идем к этим двум вещам, когда нам нужно увидеть выходные данные в одном кластере, мы идем с этим.
Но также вы должны убедиться, что данные, которые поступают в узлы слияния, должны иметь высокую конфигурацию, если вы имеете дело с огромными данными. Поскольку все данные будут загружены на эти узлы, может возникнуть исключение памяти. Хотя возмещение ущерба стоит дорого, я предпочитаю его использовать. Так как он перемешивает и распределяет данные поровну.
Будьте мудры, чтобы выбрать между слиянием и переделом.
Повторное разбиение-рекомендуется использовать повторное разбиение при увеличении количества секций, так как оно предполагает перетасовку всех данных.
Coalesce-рекомендуется использовать coalesce при уменьшении количества перегородок. Например, если у вас есть 3 раздела и вы хотите уменьшить его до 2 разделов, Coalesce переместит данные 3-го раздела В разделы 1 и 2. Разделы 1 и 2 останутся в одном контейнере.но перераспределение будет перемешивать данные во всех разделах, так что использование сети между исполнитель будет высоким, и это влияет на производительность.
Производительность мудрая коалесцирует производительность лучше, чем перераспределение, уменьшая количество разделов.
Comments