4 ответов:
уменьшить против метод использовать-foldleft
большая большая разница, не упомянутая ни в одном другом ответе stackoverflow, относящемся к этой теме, заключается в том, что
reduceдолжно быть дано коммутативным моноидом, т. е. операция является коммутативной и ассоциативной. Это означает, что операция может быть распараллелен.это различие очень важно для больших данных / MPP / распределенных вычислений, и вся причина, почему
reduceвообще существует. Коллекция может быть порубленным и тоreduceможет работать на каждом блоке, тоreduceможет работать на результатах каждого куска-на самом деле уровень chunking не нужно останавливать один уровень глубоко. Мы могли бы порубить каждый кусок тоже. Вот почему суммирование целых чисел в списке равно O (log N), если задано бесконечное число процессоров.если вы просто посмотрите на подписи, нет оснований для
reduceсуществовать, потому что вы можете достичь все, что вы можете сreduceСfoldLeft. ФункциональностьfoldLeft- это больше, чем функциональностьreduce.но вы не можете распараллелить a
foldLeft, поэтому его время выполнения всегда O (N) (даже если вы подаете коммутативный моноид). Это потому, что предполагается, что операция не коммутативный моноид и поэтому кумулированное значение будет вычислено серией последовательных агрегаций.
foldLeftне предполагает коммутативности или ассоциативности. Это ассоциативность, которая дает возможность измельчить коллекцию, и это коммутативность, которая упрощает кумуляцию, потому что порядок не важен (поэтому не имеет значения, какой порядок агрегировать каждый из результатов из каждого куска). Строго говоря, коммутативность не нужна для распараллеливания, например, распределенных алгоритмов сортировки, это просто упрощает логику, потому что вам не нужно упорядочивать свои куски.если вы посмотрите на документацию Spark для
reduceit в частности, говорится:"... коммутативный и ассоциативный двоичный оператор"http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
вот доказательство того, что
reduceэто не просто частный случайfoldLeftscala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par scala> timeMany(1000, intParList.reduce(_ + _)) Took 462.395867 milli seconds scala> timeMany(1000, intParList.foldLeft(0)(_ + _)) Took 2589.363031 milli secondsуменьшить фолд против
теперь это то, где он становится немного ближе к FP / математическим корням, и немного сложнее объяснить. Сокращение определяется формально как часть MapReduce парадигма, которая имеет дело с безупорядоченными коллекциями (мультинаборами), Fold формально определяется в терминах рекурсии (см. катаморфизм) и, таким образом, предполагает структуру / последовательность коллекций.
нет
foldметод в ошпаривании, потому что под (строгой) картой уменьшить программную модель мы не можем определитьfoldпотому что куски не имеют порядок иfoldтребуется только ассоциативность, а не коммутативность.просто
reduce стал синонимомreduceработает без приказа кумуляция,foldтребует порядка кумуляции, и именно этот порядок кумуляции требует нулевого значения, а не существования нулевого значения, которое их отличает. Строго говоряreduceдолжны работа над пустой коллекцией, потому что ее нулевое значение может быть выведено путем принятия произвольного значенияxа затем решениемx op y = x, но это не работает с некоммутативной операцией, поскольку может существовать левое и правое нулевое значение, которые различны (т. е.x op y != y op x). Конечно, Scala не утруждает себя выяснением того, что такое это нулевое значение, поскольку для этого потребуется выполнить некоторую математику (которая, вероятно, не поддается вычислению), поэтому просто бросает исключение.fold, вместо того чтобы сохранить его первоначальный смысл из MapReduce. Теперь эти термины часто используются взаимозаменяемо и ведут себя одинаково в большинстве реализаций (игнорируя пустые коллекции). Странность усугубляется особенностями, как в Spark, которые мы сейчас рассмотрим.Так Что Искры тут есть
fold, но порядок, в котором sub результаты (по одному для каждого раздела) объединяются (на момент написания), является тем же порядком, в котором выполняются задачи - и, следовательно, недетерминированным. Спасибо @CafeFeed за указание этоfoldиспользуетrunJob, который после прочтения кода я понял, что это недетерминированный. Дальнейшая путаница создается искрой, имеющейtreeReduceа неtreeFold.вывод
есть разница между
reduceиfoldдаже при применении к непустым последовательностям. Первый определяется как часть парадигмы программирования MapReduce на коллекциях с произвольным порядком (http://theory.stanford.edu/~sergei / papers / soda10-mrc.pdf) и следует предположить, что операторы являются коммутативными в дополнение к ассоциативным, чтобы дать детерминированные результаты. Последний определяется в терминах катоморфизмов и требует, чтобы коллекции имели понятие последовательности (или определялись рекурсивно, как связанные списки), поэтому не требуют коммутативных операторов.на практике из-за нематематической природы программирования,
reduceиfoldкак правило, вести себя одинаково, либо правильно (как в Scala), либо неправильно (как в Spark).дополнительно: мое мнение о Spark API
мое мнение, что путаницы можно было бы избежать, если использовать термин
foldбыл полностью сброшен в Искру. По крайней мере, у spark есть примечание в их документации:это ведет себя несколько иначе, чем операции сгиба, реализованные для не распределенные коллекции на функциональных языках, таких как Скала.
Если я не ошибаюсь, даже если Spark API не требует этого, fold также требует, чтобы f был коммутативным. Потому что порядок, в котором будут агрегироваться разделы, не гарантирован. Например в следующем коде сортируется только первая распечатка:
import org.apache.spark.{SparkConf, SparkContext} object FoldExample extends App{ val conf = new SparkConf() .setMaster("local[*]") .setAppName("Simple Application") implicit val sc = new SparkContext(conf) val range = ('a' to 'z').map(_.toString) val rdd = sc.parallelize(range) println(range.reduce(_ + _)) println(rdd.reduce(_ + _)) println(rdd.fold("")(_ + _)) }Распечатать:
abcdefghijklmnopqrstuvwxyz
abcghituvjklmwxyzqrsdefnop
defghinopjklmqrstuvabcwxyz
еще одно отличие для ошпаривания-это использование комбинаторов в Hadoop.
представьте, что ваша операция является коммутативным моноидом, с уменьшить он будет применен на стороне карты также вместо перетасовки / сортировки всех данных в редукторы. С метод использовать-foldleft это не так.
pipe.groupBy('product) { _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price } // reduce is .mapReduceMap in disguise } pipe.groupBy('product) { _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price } }это всегда хорошая практика, чтобы определить ваши операции как моноид в ошпаривании.
foldв Apache Spark-это не то же самое какfoldна не-распределенных коллекциях. На самом деле для этого требуется коммутативная функция для получения детерминированного результата:это ведет себя несколько иначе, чем операции сгиба, реализованные для не распределенных коллекции на функциональных языках, таких как Scala. Эта операция сгиба может быть применена к разбивает по отдельности, а затем складывает эти результаты в конечный результат, а не применить сложите к каждому элементу последовательно в некотором определенном порядке. Для функций которые не являются коммутативными, результат может отличаться от результата сгиба, примененного к a не-распределенного сбора.
этой показали by Мишель Розенталь и предложил Make42 in комментарии.
было предложено это наблюдаемое поведение связано с
HashPartitionerкогда на самом делеparallelizeне тасует и не используетHashPartitioner.import org.apache.spark.sql.SparkSession /* Note: standalone (non-local) mode */ val master = "spark://...:7077" val spark = SparkSession.builder.master(master).getOrCreate() /* Note: deterministic order */ val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String]) require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y }) /* Note: all posible permutations */ require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)пояснил:
структура
foldдля RDDdef fold(zeroValue: T)(op: (T, T) => T): T = withScope { var jobResult: T val cleanOp: (T, T) => T val foldPartition = Iterator[T] => T val mergeResult: (Int, T) => Unit sc.runJob(this, foldPartition, mergeResult) jobResult }тот же в структуре
reduceдля RDD:def reduce(f: (T, T) => T): T = withScope { val cleanF: (T, T) => T val reducePartition: Iterator[T] => Option[T] var jobResult: Option[T] val mergeResult = (Int, Option[T]) => Unit sc.runJob(this, reducePartition, mergeResult) jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) }здесь
runJobвыполняется с игнорированием порядка разбиения и приводит к необходимости коммутативной функции.
foldPartitionиreducePartitionэквивалентны с точки зрения порядка обработки и эффективно (по наследование и делегирование) реализованоreduceLeftиfoldLeftonTraversableOnce.вывод:
foldна RDD не может зависеть от порядка кусков и потребностей коммутативность и ассоциативность.
Comments