Как пропустить заголовок из CSV-файлов в Spark?



предположим, я даю три пути файлов к контексту Spark для чтения, и каждый файл имеет схему в первой строке. Как мы можем пропустить строки схемы из заголовков?



val rdd=sc.textFile("file1,file2,file3")


теперь, как мы можем пропустить строки заголовка из этого rdd?

668   13  

13 ответов:

Если бы в первой записи была только одна строка заголовка, то наиболее эффективным способом ее фильтрации было бы:

rdd.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}

Это не поможет, если, конечно, есть много файлов с большим количеством строк заголовка внутри. Вы можете объединить три RDDs, которые вы делаете таким образом, действительно.

вы также можете просто написать filter это соответствует только строке, которая может быть заголовком. Это довольно просто, но менее эффективно.

эквивалент Python:

from itertools import islice

rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header

в Spark 2.0 читатель CSV встроен в Spark, поэтому вы можете легко загрузить файл CSV следующим образом:

spark.read.option("header","true").csv("filePath")

С СПАРК 2.0 далее, что вы можете сделать, это использовать SparkSession чтобы сделать это в один лайнер:

val spark = SparkSession.builder.config(conf).getOrCreate()

а потом как @SandeepPurohit сказал:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)

Я надеюсь, что это решить Ваш вопрос !

P. S: SparkSession-это новая точка входа, введенная в СПАРК 2.0 и можно найти под пакета spark_sql

вы можете загрузить каждый файл отдельно, фильтровать их с помощью file.zipWithIndex().filter(_._2 > 0) а затем объединить все файлы RDDs.

Если количество файлов слишком велико, объединение может бросить StackOverflowExeption.

в PySpark вы можете использовать фрейм данных и установить заголовок как True:

df = spark.read.csv(dataPath, header=True)

использовать filter() метод в PySpark путем фильтрации первого имени столбца, чтобы удалить заголовок:

# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)

# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)

# Check your result
for i in filterDD.take(5) : print (i)

Это вариант, который вы передаете в :

context = new org.apache.spark.sql.SQLContext(sc)

var data = context.read.option("header","true").csv("<path>")

кроме того, вы можете использовать пакет spark-csv (или в Spark 2.0 это более или менее доступно изначально как CSV). Обратите внимание, что это ожидает заголовок на каждом файле (как вы хотите):

schema = StructType([
        StructField('lat',DoubleType(),True),
        StructField('lng',DoubleType(),True)])

df = sqlContext.read.format('com.databricks.spark.csv'). \
     options(header='true',
             delimiter="\t",
             treatEmptyValuesAsNulls=True,
             mode="DROPMALFORMED").load(input_file,schema=schema)

работа в 2018 году (Spark 2.3)

Python

df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

Scala

val myDf = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

PD1: myManualSchema-это предопределенная схема, написанная мной, вы можете пропустить эту часть кода

Это должно работать штраф в размере

def dropHeader(data: RDD[String]): RDD[String] = {

     data.filter(r => r!=data.first)
 }
//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\sss\*.txt",1).map{
    case (fileName, stream)=>
        val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
        (fileName, header)
}.collect().toMap

val fileNameHeaderBr = sc.broadcast(fileNameHeader)

// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\sss\*.txt",1).mapPartitions(iter =>
    if(iter.hasNext){
        val firstLine = iter.next()
        println(s"Comparing with firstLine $firstLine")
        if(firstLine == fileNameHeaderBr.value.head._2)
            new WrappedIterator(null, iter)
        else
            new WrappedIterator(firstLine, iter)
    }
    else {
        iter
    }
).collect().foreach(println)

class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
    var isFirstIteration = true
    override def hasNext: Boolean = {
        if (isFirstIteration && firstLine != null){
            true
        }
        else{
            iter.hasNext
        }
    }

    override def next(): String = {
        if (isFirstIteration){
            println(s"For the first time $firstLine")
            isFirstIteration = false
            if (firstLine != null){
                firstLine
            }
            else{
                println(s"Every time $firstLine")
                iter.next()
            }
        }
        else {
          iter.next()
        }
    }
}

для разработчиков python. Я проверил с spark2.0. Допустим, вы хотите удалить первые 14 строк.

sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn-это функция df. Поэтому ниже не будет работать в стиле RDD, как указано выше.

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)

Comments

    Ничего не найдено.