Запись на несколько выходов с помощью ключа Spark-одно задание Spark
как вы можете писать на несколько выходов, зависящих от ключа, используя Spark в одном задании.
связанный: запись на несколько выходов с помощью ключа ошпаривания Hadoop, одно задание MapReduce
например.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
обеспечили бы cat prefix/1 - это
a
b
и cat prefix/2 будет
c
ответ
для точного ответа с полным импортом, pimp и кодеком сжатия см. https://stackoverflow.com/a/46118044/1586965
10 ответов:
если вы используете Spark 1.4+, это стало намного, намного проще благодаря DataFrame API. (Фреймы данных были введены в Spark 1.3, но
partitionBy(), который нам нужен, был представил в 1.4.)если вы начинаете с RDD, вам сначала нужно преобразовать его в фрейм данных:
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie"))) val people_df = people_rdd.toDF("number", "name")в Python, этот же код:
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")]) people_df = people_rdd.toDF(["number", "name"])после того, как у вас есть фрейм данных, запись на несколько выходов на основе определенного ключа является простой. Более того-и это красота API DataFrame-код в значительной степени одинаковый для Python, Scala, Java и R:
people_df.write.partitionBy("number").text("people")и вы можете легко использовать другие форматы, если вы хотите:
people_df.write.partitionBy("number").json("people-json") people_df.write.partitionBy("number").parquet("people-parquet")в каждом из этих примеров Spark создаст подкаталог для каждого из ключей, на которые мы разделили фрейм данных:
people/ _SUCCESS number=1/ part-abcd part-efgh number=2/ part-abcd part-efgh
Я бы сделал это так, который является масштабируемым
import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] } object Split { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Split" + args(1)) val sc = new SparkContext(conf) sc.textFile("input/path") .map(a => (k, v)) // Your own implementation .partitionBy(new HashPartitioner(num)) .saveAsHadoopFile("output/path", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) spark.stop() } }просто видел аналогичный ответ выше, но на самом деле нам не нужны индивидуальные разделы. В MultipleTextOutputFormat будет создавать файл для каждого ключа. Это нормально, что несколько записей с одинаковыми ключами попадают в один и тот же раздел.
new HashPartitioner (num), где num-номер раздела, который вы хотите. В случае, если у вас есть большое количество различных ключей, вы можете установить номер на большой. В этом случае каждый раздел будет не открывайте слишком много обработчиков файлов hdfs.
Если у вас потенциально есть много значений для данного ключа, я думаю, что масштабируемое решение состоит в том, чтобы записать один файл на ключ на раздел. К сожалению, в Spark нет встроенной поддержки для этого, но мы можем что-то придумать.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .mapPartitionsWithIndex { (p, it) => val outputs = new MultiWriter(p.toString) for ((k, v) <- it) { outputs.write(k.toString, v) } outputs.close Nil.iterator } .foreach((x: Nothing) => ()) // To trigger the job. // This one is Local, but you could write one for HDFS class MultiWriter(suffix: String) { private val writers = collection.mutable.Map[String, java.io.PrintWriter]() def write(key: String, value: Any) = { if (!writers.contains(key)) { val f = new java.io.File("output/" + key + "/" + suffix) f.getParentFile.mkdirs writers(key) = new java.io.PrintWriter(f) } writers(key).println(value) } def close = writers.values.foreach(_.close) }(вместо
PrintWriterС вашим выбором распределенной работы файловой системы.)это делает один проход над RDD и не выполняет перетасовки. Это дает вам один каталог на ключ, с несколькими файлами внутри каждого.
это включает в себя кодек по запросу, необходимый импорт и сутенер по запросу.
import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext // TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) { def writeAsMultiple(prefix: String, codec: String, keyName: String = "key") (implicit sqlContext: SQLContext): Unit = { import sqlContext.implicits._ rdd.toDF(keyName, "_2").write.partitionBy(keyName) .format("text").option("codec", codec).save(prefix) } } val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")одно тонкое отличие от OP заключается в том, что он будет префикс
<keyName>=к именам каталогов. Е. Г.myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")даст:
prefix/key=1/part-00000 prefix/key=2/part-00000здесь
prefix/my_number=1/part-00000будет содержать строкиaиbиprefix/my_number=2/part-00000будет содержать строкуc.и
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")даст:
prefix/foo=1/part-00000 prefix/foo=2/part-00000он должен Будьте понятны, как редактировать для
parquet.наконец, ниже приведен пример
Dataset, что, пожалуй, лучше, чем использование кортежей.implicit class PimpedDataset[T](dataset: Dataset[T]) { def writeAsMultiple(prefix: String, codec: String, field: String): Unit = { dataset.write.partitionBy(field) .format("text").option("codec", codec).save(prefix) } }
у меня аналогичная потребность и нашел способ. Но у него есть один недостаток (который не является проблемой для моего случая): вам нужно повторно разбить данные на один раздел на выходной файл.
для разделения таким образом, как правило, требуется заранее знать, сколько файлов будет выведено заданием, и найти функцию, которая сопоставит каждый ключ с каждым разделом.
сначала давайте создадим наш класс MultipleTextOutputFormat на основе:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] { override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = { key.toString } override protected def generateActualKey(key: T, value: V) = { null } }С этим классом Искра получит ключ из раздела (первый/последний, я думаю) и назовите файл с этим ключом, поэтому не стоит смешивать несколько ключей на одном разделе.
для вашего примера вам потребуется пользовательский разделитель. Это будет делать работу:
import org.apache.spark.Partitioner class IdentityIntPartitioner(maxKey: Int) extends Partitioner { def numPartitions = maxKey def getPartition(key: Any): Int = key match { case i: Int if i < maxKey => i } }Теперь давайте сложим все вместе:
val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e"))) // You need to know the max number of partitions (files) beforehand // In this case we want one partition per key and we have 3 keys, // with the biggest key being 7, so 10 will be large enough val partitioner = new IdentityIntPartitioner(10) val prefix = "hdfs://.../prefix" val partitionedRDD = rdd.partitionBy(partitioner) partitionedRDD.saveAsHadoopFile(prefix, classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])это создаст 3 файла под префиксом (с именем 1, 2 и 7), обрабатывая все за один проход.
как вы можете видеть, Вам нужно некоторое знание о ваших ключах к будьте в состоянии использовать это решение.
для меня это было проще, потому что мне нужен был один выходной файл для каждого ключевого хэша, и количество файлов было под моим контролем, поэтому я мог использовать фондовый HashPartitioner для выполнения трюка.
saveAsText () и saveAsHadoop(...) реализуются на основе данных RDD, в частности методом:PairRDD.saveAsHadoopDataset, который принимает данные из PairRdd, где он выполняется. Я вижу два возможных варианта: Если ваши данные относительно малы по размеру, вы можете сэкономить некоторое время реализации, группируя по RDD, создавая новый RDD из каждой коллекции и используя этот RDD для записи данных. Что-то вроде этого:
val byKey = dataRDD.groupByKey().collect() val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)} val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}обратите внимание, что это будет не работает для больших наборов данных b / c материализация итератора в
v.toSeqможет не поместиться в памяти.другой вариант, который я вижу, и на самом деле тот, который я бы рекомендовал в этом случае: сверните свой собственный, напрямую вызвав api hadoop/hdfs.
вот дискуссия, которую я начал, исследуя этот вопрос: как создать RDDs из другого RDD?
Мне нужно то же самое в Java. Публикация моего перевода ответ скалы Чжан Чжана чтобы вызвать пользователей Java API:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> { @Override protected String generateFileNameForKeyValue(A key, B value, String name) { return key.toString(); } } public class Main { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("Split Job") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"}; sc.parallelize(Arrays.asList(strings)) // The first character of the string is the key .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s)) .saveAsHadoopFile("output/", String.class, String.class, RDDMultipleTextOutputFormat.class); sc.stop(); } }
У меня был аналогичный случай использования, когда я разделил входной файл на Hadoop HDFS на несколько файлов на основе ключа (1 файл на ключ). Вот мой код scala для spark
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; val hadoopconf = new Configuration(); val fs = FileSystem.get(hadoopconf); @serializable object processGroup { def apply(groupName:String, records:Iterable[String]): Unit = { val outFileStream = fs.create(new Path("/output_dir/"+groupName)) for( line <- records ) { outFileStream.writeUTF(line+"\n") } outFileStream.close() } } val infile = sc.textFile("input_file") val dateGrouped = infile.groupBy( _.split(",")(0)) dateGrouped.foreach( (x) => processGroup(x._1, x._2))я сгруппировал записи по ключу. Значения для каждого ключа записываются в отдельный файл.
хорошая новость для пользователя python в случае, если у вас есть несколько столбцов, и вы хотите сохранить все остальные столбцы, не разделенные в формате csv, который не удастся, если вы используете метод "текст" в качестве предложения Ника Чаммаса .
people_df.write.partitionBy("number").text("people")сообщение об ошибке " AnalysisException: источник данных u'text поддерживает только один столбец, и у вас есть 2 столбца.;'"
в spark 2.0.0 (моя тестовая среда-это spark 2.0.0 hdp) пакет " com.databricks.искра.csv " теперь интегрирован , и это позвольте нам сохранить текстовый файл, разделенный только на один столбец, см. Пример blow:
people_rdd = sc.parallelize([(1,"2016-12-26", "alice"), (1,"2016-12-25", "alice"), (1,"2016-12-25", "tom"), (1, "2016-12-25","bob"), (2,"2016-12-26" ,"charlie")]) df = people_rdd.toDF(["number", "date","name"]) df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people") [root@namenode people]# tree . ├── number=1 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv ├── number=2 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv └── _SUCCESS [root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,alice 2016-12-25,alice 2016-12-25,tom 2016-12-25,bob [root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,charlieв моей среде spark 1.6.1 код не выдал никакой ошибки, однако Ther-это только один файл, сгенерированный. он не разделен на две папки.
надеюсь, что это может помочь .
у меня был аналогичный случай использования. Я решала это в Java, написав два пользовательских классов реализации
MultipleTextOutputFormatиRecordWriter.мой вклад был
JavaPairRDD<String, List<String>>и я хотел сохранить его в файле, названном его ключом, со всеми строками, содержащимися в его значении.вот код для моего
MultipleTextOutputFormatреализацияclass RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> { @Override protected String generateFileNameForKeyValue(K key, V value, String name) { return key.toString(); //The return will be used as file name } /** The following 4 functions are only for visibility purposes (they are used in the class MyRecordWriter) **/ protected String generateLeafFileName(String name) { return super.generateLeafFileName(name); } protected V generateActualValue(K key, V value) { return super.generateActualValue(key, value); } protected String getInputFileBasedOutputFileName(JobConf job, String name) { return super.getInputFileBasedOutputFileName(job, name); } protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { return super.getBaseRecordWriter(fs, job, name, arg3); } /** Use my custom RecordWriter **/ @Override RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException { final String myName = this.generateLeafFileName(name); return new MyRecordWriter<K, V>(this, fs, job, arg3, myName); } }вот код для моего
RecordWriterреализация.class MyRecordWriter<K, V> implements RecordWriter<K, V> { private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat; private final FileSystem fs; private final JobConf job; private final Progressable arg3; private String myName; TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap(); MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) { this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat; this.fs = fs; this.job = job; this.arg3 = arg3; this.myName = myName; } @Override void write(K key, V value) throws IOException { String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName); String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath); Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value); RecordWriter rw = this.recordWriters.get(finalPath); if(rw == null) { rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3); this.recordWriters.put(finalPath, rw); } List<String> lines = (List<String>) actualValue; for (String line : lines) { rw.write(null, line); } } @Override void close(Reporter reporter) throws IOException { Iterator keys = this.recordWriters.keySet().iterator(); while(keys.hasNext()) { RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next()); rw.close(reporter); } this.recordWriters.clear(); } }большая часть кода точно такая же, как в
FileOutputFormat. Этот разница только в этих нескольких строкахList<String> lines = (List<String>) actualValue; for (String line : lines) { rw.write(null, line); }эти строки позволили мне написать каждую строку моего ввода
List<String>на файл. Первый аргументwriteфункция имеет значениеnullво избежание писать ключ на каждой линии.чтобы закончить, мне нужно только сделать этот вызов, чтобы написать мои файлы
javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);
Comments