14 ответов:
вот пример разницы, как
spark-shellсеанс:во-первых, некоторые данные - две строки текста:
val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue")) // lines rdd.collect res0: Array[String] = Array("Roses are red", "Violets are blue")теперь
mapпреобразует RDD длины N в другой RDD длины N.например, он отображает из двух строк в две длины строк:
rdd.map(_.length).collect res1: Array[Int] = Array(13, 16)но
flatMap(грубо говоря) преобразует RDD длины N в коллекцию из N коллекций, а затем сглаживает их в один RDD результатов.rdd.flatMap(_.split(" ")).collect res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")у нас есть несколько слов в строке, и несколько строк, но мы в конечном итоге с одним выходным массивом слов
просто чтобы проиллюстрировать это, flatMapping из коллекции строк в коллекцию слов выглядит так:
["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]поэтому входные и выходные RDDs обычно будут иметь разные размеры для
flatMap.если бы мы попытались использовать
mapСsplitфункция, мы бы в конечном итоге с вложенными структурами (RDD массивов слов, с типомRDD[Array[String]]) потому что мы должны иметь ровно один результат на вход:rdd.map(_.split(" ")).collect res3: Array[Array[String]] = Array( Array(Roses, are, red), Array(Violets, are, blue) )наконец, один полезный частный случай-сопоставление с функцией, которая может не возвращать ответ, и поэтому возвращает
Option. Мы можем использоватьflatMapотфильтровать элементы, которые возвращаютNoneи извлеките значения из тех, которые возвращают aSome:val rdd = sc.parallelize(Seq(1,2,3,4)) def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None rdd.flatMap(myfn).collect res3: Array[Int] = Array(10,20)(отмечая здесь, что опция ведет себя скорее как список, который имеет либо один элемент, либо ноль элементы)
обычно мы используем пример подсчета слов в hadoop. Я возьму тот же случай использования и буду использовать
mapиflatMapи мы увидим разницу в том, как он обрабатывает данные.Ниже приведен пример файла данных.
hadoop is fast hive is sql on hdfs spark is superfast spark is awesomeприведенный выше файл будет проанализирован с помощью
mapиflatMap.используя
map>>> wc = data.map(lambda line:line.split(" ")); >>> wc.collect() [u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome']входной сигнал имеет 4 линии и размер выхода 4 также, т. е., n элементов ==> n элементов.
используя
flatMap>>> fm = data.flatMap(lambda line:line.split(" ")); >>> fm.collect() [u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome']выход отличается от карты.
давайте назначим 1 в качестве значения для каждого ключа, чтобы получить количество слов.
fm: RDD создается с помощьюflatMapwc: RDD создан с помощьюmap>>> fm.map(lambda word : (word,1)).collect() [(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)], тогда как
mapна RDDwcдаст ниже нежелательный выход:>>> wc.flatMap(lambda word : (word,1)).collect() [[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1]вы не можете получить количество слов, если
mapиспользуется вместоflatMap.согласно определению, разница между
mapиflatMap- это:
map: он возвращает новый RDD, применяя данную функцию к каждому элементу из РДД. Функция вmapвозвращает только один элемент.
flatMap: аналогичноmap, он возвращает новый RDD, применяя функцию к каждому элементу RDD, но выход уплощен.
Если вы спрашиваете разницу между RDD.карта и RDD.flatMap в Spark map преобразует RDD размера N в другой размер N . например.
myRDD.map(x => x*2)например, если myRDD состоит из двойников .
В то время как flatMap может преобразовать RDD в пыльник одного из разных размеров: например.:
myRDD.flatMap(x =>new Seq(2*x,3*x))который вернет RDD размера 2*N или
myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )
использовать
test.mdнапример:➜ spark-1.6.1 cat test.md This is the first line; This is the second line; This is the last line. scala> val textFile = sc.textFile("test.md") scala> textFile.map(line => line.split(" ")).count() res2: Long = 3 scala> textFile.flatMap(line => line.split(" ")).count() res3: Long = 15 scala> textFile.map(line => line.split(" ")).collect() res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.)) scala> textFile.flatMap(line => line.split(" ")).collect() res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)если вы используете
mapметод, вы получите строкиtest.mdнаflatMapметод, вы получите количество слов.The
mapметод похож наflatMap, они все возвращают новый RDD.mapметод часто использовать возврат нового RDD,flatMapметод часто использовать разделенные слова.
mapвозвращает RDD равного количества элементов в то время какflatMapне может.пример использования
flatMapотфильтровать отсутствующие или неправильные данные.пример использования
mapиспользование в самых разных случаях, когда количество элементов ввода и вывода одинаковы.количество.csv
1 2 3 - 4 - 5map.py добавляет все числа в добавлять.csv.
from operator import * def f(row): try: return float(row) except Exception: return 0 rdd = sc.textFile('a.csv').map(f) print(rdd.count()) # 7 print(rdd.reduce(add)) # 15.0flatMap.py использует
flatMapчтобы отфильтровать недостающие данные перед добавлением. По сравнению с предыдущей версией добавлено меньше номеров.from operator import * def f(row): try: return [float(row)] except Exception: return [] rdd = sc.textFile('a.csv').flatMap(f) print(rdd.count()) # 5 print(rdd.reduce(add)) # 15.0
map и flatMap похожи, в том смысле, что они берут строку из входного RDD и применяют к ней функцию. Они отличаются тем, что функция в map возвращает только один элемент, в то время как функция в flatMap может возвращать список элементов (0 или более) в качестве итератора.
кроме того, Выходные данные flatMap сглажены. Хотя функция в flatMap возвращает список элементов, flatMap возвращает RDD, который имеет все элементы из списка в плоском виде (не a список.)
Flatmap и Map оба преобразуют коллекцию.
отличия:
карта (func)
Возвращает новый распределенный набор данных, сформированный путем передачи каждого элемента источника через функцию func.flatMap (func)
Аналогично map, но каждый входной элемент может быть сопоставлен с 0 или более выходными элементами (поэтому func должен возвращать Seq, а не один элемент).преобразование функция:
карта: один элемент in - > один элемент out.
flatMap: один элемент in - > 0 или более элементов out (коллекция).
разница видна из приведенного ниже примера кода pyspark:
rdd = sc.parallelize([2, 3, 4]) rdd.flatMap(lambda x: range(1, x)).collect() Output: [1, 1, 2, 1, 2, 3] rdd.map(lambda x: range(1, x)).collect() Output: [[1], [1, 2], [1, 2, 3]]
Это сводится к вашему первоначальному вопросу: что вы подразумеваете под уплощение ?
при использовании flatMap, a "многомерный" коллекция становится "одномерные" коллекция.
val array1d = Array ("1,2,3", "4,5,6", "7,8,9") //array1d is an array of strings val array2d = array1d.map(x => x.split(",")) //array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) ) val flatArray = array1d.flatMap(x => x.split(",")) //flatArray will be : Array (1,2,3,4,5,6,7,8,9)вы хотите использовать плоскую карту, когда,
- ваша функция карты приводит к созданию многослойных структур
- но все, что вы хотите, является простым плоским один размерная структура, удаляя все внутренние группировки
для всех тех, кто хотел PySpark связанных:
пример преобразования: flatMap
>>> a="hello what are you doing" >>> a.split()['привет', 'что', 'are', 'you', 'doing']
>>> b=["hello what are you doing","this is rak"] >>> b.split()обратная трассировка (самый недавний призыв последнего): Файл "", строка 1, in AttributeError: объект 'list' не имеет атрибута 'split'
>>> rline=sc.parallelize(b) >>> type(rline)>>> def fwords(x): ... return x.split() >>> rword=rline.map(fwords) >>> rword.collect()[['hello', 'what', 'are',' you',' doing'], ['this',' is','rak']]
>>> rwordflat=rline.flatMap(fwords) >>> rwordflat.collect()['привет', 'что', "есть", "ты", "делание", "это", "есть", "рак"]
надеюсь, что это помогает :)
RDD.карта возвращает все элементы в одном массиве
RDD.flatMap возвращает элементы в массивах array
предположим, у нас есть текст в текст.txt-файл
Spark is an expressive framework This text is to understand map and faltMap functions of Spark RDDиспользуя карту
val text=sc.textFile("text.txt").map(_.split(" ")).collectвыход:
text: **Array[Array[String]]** = Array(Array(Spark, is, an, expressive, framework), Array(This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD))С помощью flatMap
val text=sc.textFile("text.txt").flatMap(_.split(" ")).collectвыход:
text: **Array[String]** = Array(Spark, is, an, expressive, framework, This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD)
разница в выводе карты и flatMap:
1.
flatMapval a = sc.parallelize(1 to 10, 5) a.flatMap(1 to _).collect()выход:
1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 102.
map:val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length).collect()выход:
3 6 6 3 8
- map (func) возвращает новый распределенный набор данных, сформированный путем передачи каждого элемента источника через функцию func declared.so map () - это один термин
whiles
- flatMap (func) аналогично map, но каждый входной элемент может быть сопоставлен с 0 или более выходными элементами, поэтому func должен возвращать последовательность, а не один элемент.
map: он возвращает новый RDD, применяя функцию к каждому элементу RDD. Функции .карта * * * * может вернуть только один элемент.****
flatMap: аналогично map, он возвращает новый RDD путем * * * * применения функции к каждому элементу RDD, но вывод сглажен.****
кроме того, функция в flatMap может возвращать список элементов (0 или более)
Например: Южная Каролина.распараллелить([3,4,5]).карта (лямбда x: диапазон (1, x)).собирать() Выход: [[1, 2], [1, 2, 3], [1, 2, 3, 4]]
sc.распараллелить([3,4,5]).flatMap(lambda x: range (1,x)).собирать() Вывод: уведомление o / p сглажено в одном списке [1, 2, 1, 2, 3, 1, 2, 3, 4]
Source:https://www.linkedin.com/pulse/difference-between-map-flatmap-transformations-spark-pyspark-pandey/
Comments