Как работает HashPartitioner?



Я прочитал в документации HashPartitioner. К сожалению, ничего особенного не было объяснено, кроме вызовов API. Я исхожу из предположения, что HashPartitioner разбивает распределенный набор на основе хэша ключей. Например, если мои данные как



(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)


таким образом, разделитель поместил бы это в разные разделы с одинаковыми ключами, попадающими в один и тот же раздел. Однако я не понимаю значение аргумента конструктора



new HashPartitoner(numPartitions) //What does numPartitions do?


для приведенного выше набора данных, как бы результаты отличались, если бы я сделал



new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)


так как HashPartitioner работать на самом деле?

625   3  

3 ответов:

ну, давайте сделаем ваш набор данных немного более интересным:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

у нас есть шесть элементов:

rdd.count
Long = 6

нет разметки:

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

и восемь разделов:

rdd.partitions.length
Int = 8

теперь давайте определим небольшой помощник для подсчета количества элементов в разделе:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

поскольку у нас нет разделителя, наш набор данных равномерно распределяется между разделами (Разделение По Умолчанию Схема на Искра):

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

inital-distribution

теперь давайте переделаем наш набор данных:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

так как параметр передан в HashPartitioner определяет количество разделов, которые мы ожидаем один раздел:

rddOneP.partitions.length
Int = 1

так как у нас есть только один раздел, он содержит все элементы:

countByPartition(rddOneP).collect
Array[Int] = Array(6)

hash-partitioner-1

обратите внимание, что порядок значений после перетасовка является недетерминированной.

то же самое, если мы используем HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

мы получим 2 раздела:

rddTwoP.partitions.length
Int = 2

С rdd разделяется по ключевым данным больше не будет распределяться равномерно:

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

потому что с имеют три ключа и только два разных значения hashCode mod numPartitions здесь нет ничего неожиданного:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

просто для подтверждения выше:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

hash-partitioner-2

наконец-то с HashPartitioner(7) мы получаем семь разделов, три непустых с 2 элементами каждый:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

hash-partitioner-7

резюме и примечания

  • HashPartitioner принимает один аргумент, который определяет количество разделов
  • значения присваиваются разделам с помощью hash of ключи. hash функция может отличаться в зависимости от языка (Scala RDD может использовать hashCode,DataSets используйте MurmurHash 3, PySpark,portable_hash).

    в простом случае, как это, где ключ является малым целым числом, вы можете предположить, что hash - это идентификатор (i = hash(i)).

    Scala API использует nonNegativeMod чтобы определить раздел на основе вычисленного хэша,

  • если распределение ключей не является однородным, вы можете в конечном итоге в ситуации, когда часть вашего кластера простаивает

  • ключи должны быть hashable. Вы можете проверить мой ответ для список в качестве ключа для reduceByKey PySpark чтобы прочитать о конкретных проблемах PySpark. Еще одна возможная проблема выделена документация HashPartitioner:

    массивы Java имеют хэш-коды, основанные на идентификаторах массивов, а не на их содержимом, поэтому попытка разбить RDD [Array[]] или RDD [(Array[],_)] использование HashPartitioner приведет к неожиданному или неправильному результату.

  • в Python 3, Вы должны убедиться, что хеширование-это последовательное. Смотрите что означает исключение: случайность хэша строки должна быть отключена через PYTHONHASHSEED в pyspark?

  • hash partitioner не является ни инъективным, ни сюръективным. Различные ключи могут быть назначены один раздел и некоторые разделы могут оставаться пустыми.

  • обратите внимание, что в настоящее время методы на основе хэша не работают в Scala в сочетании с REPL определенными классами case (равенство классов в Apache Spark).

  • HashPartitioner (или любой другой Partitioner) тасует данные. Если секционирование не используется повторно между несколькими операциями, это не уменьшает количество данных, которые нужно перетасовать.

RDD распределяется это означает, что он разделен на некоторое количество частей. Каждый из этих разделов потенциально находится на разных машинах. Хэш-разделения с арумент numPartitions выбирает на каком разделе разместить пару (key, value) в следующую сторону:

  1. создает numPartitions разделы.
  2. мест (key, value) в разделе с номером Hash(key) % numPartitions

The HashPartitioner.getPartition метод принимает ключ как его аргумент и возвращает индекс раздела, к которому принадлежит ключ. Разделитель должен знать, что такое допустимые индексы, поэтому он возвращает числа в правильном диапазоне. Количество разделов указывается через numPartitions аргумент конструктора.

реализация возвращает примерно key.hashCode() % numPartitions. Смотрите разметки.скала для более подробной информации.

Comments

    Ничего не найдено.