Apache Spark RDD фильтр на два RDDs
Мне нужно разделить RDD на 2 части:
1 часть, которая удовлетворяет условию; другая часть, которая не удовлетворяет. Я могу сделать filter дважды на оригинальном RDD, но это кажется неэффективным. Есть ли способ сделать то, что мне нужно? Я ничего не могу найти ни в API, ни в литературе.
5 ответов:
Spark не поддерживает это по умолчанию. Фильтрация по одним и тем же данным дважды не так уж плоха, если вы кэшируете их заранее, и сама фильтрация выполняется быстро.
Если это действительно просто два разных типа, вы можете использовать вспомогательный метод:
Но как только у вас есть несколько типов данных, просто назначьте отфильтрованные новому val.implicit class RDDOps[T](rdd: RDD[T]) { def partitionBy(f: T => Boolean): (RDD[T], RDD[T]) = { val passes = rdd.filter(f) val fails = rdd.filter(e => !f(e)) // Spark doesn't have filterNot (passes, fails) } } val (matches, matchesNot) = sc.parallelize(1 to 100).cache().partitionBy(_ % 2 == 0)
Суть в том, что вы хотите сделать не фильтр, а карту.
(T) -> (Boolean, T)Извините, я неэффективен в синтаксисе Scala. Но идея заключается в том, что вы разделяете набор ответов, сопоставляя его с парами ключ/значение. Ключ может быть булевым, указывающим на то, что он прошел предикат 'Filter' или нет.
Spark RDD не имеет такого api.
Вот версия, основанная на запросе pull для rdd.span , который должен работать:
import scala.reflect.ClassTag import org.apache.spark.rdd._ def split[T:ClassTag](rdd: RDD[T], p: T => Boolean): (RDD[T], RDD[T]) = { val splits = rdd.mapPartitions { iter => val (left, right) = iter.partition(p) val iterSeq = Seq(left, right) iterSeq.iterator } val left = splits.mapPartitions { iter => iter.next().toIterator} val right = splits.mapPartitions { iter => iter.next() iter.next().toIterator } (left, right) } val rdd = sc.parallelize(0 to 10, 2) val (first, second) = split[Int](rdd, _ % 2 == 0 ) first.collect // Array[Int] = Array(0, 2, 4, 6, 8, 10)
Если вы в порядке с
TвместоRDD[T], то вы можете сделать это. В противном случае, вы могли бы сделать что-то вроде этого:val data = sc.parallelize(1 to 100) val splitData = data.mapPartitions{iter => { val splitList = (iter.toList).partition(_%2 == 0) Tuple1(splitList).productIterator } }.map(_.asInstanceOf[Tuple2[List[Int],List[Int]]])И, вероятно, вам нужно будет уменьшить это, чтобы объединить списки, когда вы идете выполнять действие
Вы можете использовать
subtract function(Если операция фильтра слишком дорогая).Код Пыспарка:
rdd1 = data.filter(filterFunction) rdd2 = data.subtract(rdd1)
Comments