СПАРК - передел() против объединиться()



Согласно учебной Искре




Имейте в виду, что перераспределение данных-довольно дорогостоящая операция.
Spark также имеет оптимизированную версию функции repartition () под названием coalesce (), которая позволяет избежать перемещения данных, но только при уменьшении числа разделов RDD.




Одно отличие, которое я получаю, состоит в том, что с помощью repartition () количество разделов может быть увеличено/уменьшено, но с coalesce () количество разделов может быть только уменьшившийся.



Если разделы распределены по нескольким машинам и выполняется функция coalesce (), как она может избежать перемещения данных?

612   7  

7 ответов:

Он избегает полного перемешивания. Если известно, что число секций уменьшается, то исполнитель может безопасно хранить данные на минимальном количестве секций, только перемещая данные с дополнительных узлов на узлы, которые мы сохранили.

Итак, это будет выглядеть примерно так:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

Затем coalesce вплоть до 2 разделов:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
Обратите внимание, что узлы 1 и 3 не требовали перемещения исходных данных.

Ответ Джастина потрясает, и этот ответ идет в более глубокое русло.

Алгоритм repartition делает полную перетасовку и создает новые разделы с данными, которые распределены равномерно. Давайте создадим фрейм данных с числами от 1 до 12.

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf содержит 4 раздела на моей машине.

numbersDf.rdd.partitions.size // => 4

Вот как данные делятся на разделы:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

Давайте сделаем полную перетасовку с помощью метода repartition и получим эти данные на двух узлы.

val numbersDfR = numbersDf.repartition(2)

Вот как разделяются данные numbersDfR на моей машине:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

Метод repartition создает новые разделы и равномерно распределяет данные в новых разделах (распределение данных более равномерное для больших наборов данных).

Разница между coalesce и repartition

coalesce использует существующие разделы, чтобы свести к минимуму объем перетасованных данных. repartition создает новые разделы и делает полную перетасовку. coalesce приводит к разделам с различные объемы данных (иногда разделы, которые имеют очень разные размеры) и repartition приводят к разделам примерно одинакового размера.

Это coalesce или repartition быстрее?

coalesce может работать быстрее, чем repartition, но с разделами неравного размера обычно медленнее работать, чем с разделами равного размера. Обычно после фильтрации большого набора данных требуется перераспределение наборов данных. Я нашел repartition, чтобы быть быстрее в целом, потому что Spark построен для работы с равными размерами перекрытия.

Прочитайте этот пост в блоге , Если вам нужны еще более подробные сведения.

Один дополнительный момент, который следует отметить здесь, заключается в том, что в качестве основного принципа Spark RDD является неизменность. Перераспределение или слияние создаст новый RDD. Базовый RDD будет продолжать существовать со своим исходным числом разделов. В случае, если вариант использования требует сохранения RDD в кэше, то то же самое должно быть сделано для вновь созданного RDD.

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

Все ответы добавляют некоторые большие знания в этот очень часто задаваемый вопрос.

Итак, следуя традиции временной шкалы этого вопроса, вот мои 2 цента.

Я обнаружил, чтоперераспределение происходит быстрее, чем слияние , в очень специфическом случае.

В моем приложении, когда количество файлов, которое мы оцениваем, меньше определенного порога, перераспределение работает быстрее.

Вот что я имею в виду

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

В приведенном выше фрагменте, если мои файлы были меньше чем 20, слияние занимало вечность, чтобы закончить, в то время как перераспределение было намного быстрее, и поэтому приведенный выше код.

Конечно, это число (20) будет зависеть от количества работников и объема данных. Надеюсь, это поможет.

Простым способом COALESCE: - это только для уменьшения количества разделов, без перетасовки данных он просто сжимает разделы

Перераспределение: - это как для увеличения, так и для уменьшения количества разделов, но перетасовка имеет место

Пример:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

Оба прекрасно работают

Но мы обычно идем к этим двум вещам, когда нам нужно увидеть выходные данные в одном кластере, мы идем с этим.

Но также вы должны убедиться, что данные, которые поступают в узлы слияния, должны иметь высокую конфигурацию, если вы имеете дело с огромными данными. Поскольку все данные будут загружены на эти узлы, может возникнуть исключение памяти. Хотя возмещение ущерба стоит дорого, я предпочитаю его использовать. Так как он перемешивает и распределяет данные поровну.

Будьте мудры, чтобы выбрать между слиянием и переделом.

Повторное разбиение-рекомендуется использовать повторное разбиение при увеличении количества секций, так как оно предполагает перетасовку всех данных.

Coalesce-рекомендуется использовать coalesce при уменьшении количества перегородок. Например, если у вас есть 3 раздела и вы хотите уменьшить его до 2 разделов, Coalesce переместит данные 3-го раздела В разделы 1 и 2. Разделы 1 и 2 останутся в одном контейнере.но перераспределение будет перемешивать данные во всех разделах, так что использование сети между исполнитель будет высоким, и это влияет на производительность.

Производительность мудрая коалесцирует производительность лучше, чем перераспределение, уменьшая количество разделов.

Comments

    Ничего не найдено.