Загрузите CSV-файл с помощью Spark
Я новичок в Spark, и я пытаюсь прочитать данные CSV из файла с Spark.
Вот что я делаю :
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Я ожидал бы, что этот вызов даст мне список из двух первых столбцов моего файла, но я получаю эту ошибку:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
хотя мой CSV-файл в более чем один столбец.
10 ответов:
вы уверены, что все строки имеют не менее 2 столбцов? Вы можете попробовать что-то вроде, просто проверить?:
sc.textFile("file.csv") \ .map(lambda line: line.split(",")) \ .filter(lambda line: len(line)>1) \ .map(lambda line: (line[0],line[1])) \ .collect()кроме того, вы можете распечатать виновника (если таковые имеются):
sc.textFile("file.csv") \ .map(lambda line: line.split(",")) \ .filter(lambda line: len(line)<=1) \ .collect()
Искра 2.0.0+
вы можете использовать встроенный источник данных csv напрямую:
spark.read.csv( "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema )или
(spark.read .schema(schema) .option("header", "true") .option("mode", "DROPMALFORMED") .csv("some_input_file.csv"))без каких-либо внешних зависимостей.
Искра :
вместо ручного разбора, который далек от тривиального в общем случае, я бы рекомендовал
spark-csv:убедитесь, что Spark CSV включен в путь (
--packages,--jars,--driver-class-path)и загрузите ваши данные следующим образом:
(df = sqlContext .read.format("com.databricks.spark.csv") .option("header", "true") .option("inferschema", "true") .option("mode", "DROPMALFORMED") .load("some_input_file.csv"))он может обрабатывать загрузку, вывод схемы, удаление искаженных строк и не требует передачи данных из Python в JVM.
Примечание:
если вы знаете схему, лучше избежать вывода схемы и передать его в
DataFrameReader. Предполагая, что у вас есть три столбца-integer, double и string:from pyspark.sql.types import StructType, StructField from pyspark.sql.types import DoubleType, IntegerType, StringType schema = StructType([ StructField("A", IntegerType()), StructField("B", DoubleType()), StructField("C", StringType()) ]) (sqlContext .read .format("com.databricks.spark.csv") .schema(schema) .option("header", "true") .option("mode", "DROPMALFORMED") .load("some_input_file.csv"))
простое разбиение на запятые также разделит запятые, которые находятся в полях (например,
a,b,"1,2,3",c), Так что это не рекомендуется. ответ zero323 хорошо, если вы хотите использовать API DataFrames, но если вы хотите придерживаться базовой Spark, вы можете анализировать CSV в базовом Python с помощью csv модуль:# works for both python 2 and 3 import csv rdd = sc.textFile("file.csv") rdd = rdd.mapPartitions(lambda x: csv.reader(x))EDIT: Как упоминалось в комментариях @muon, это будет относиться к заголовку, как и к любой другой строке, поэтому вам нужно будет извлечь его вручную. Например,
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)(сделать конечно, не изменятьheaderпрежде чем фильтр оценит). Но на данный момент вам, вероятно, лучше использовать встроенный CSV-парсер.
и еще один вариант, который состоит в чтении CSV-файла с помощью Pandas, а затем импортирует фрейм данных Pandas в Spark.
например:
from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd sc = SparkContext('local','example') # if using locally sql_sc = SQLContext(sc) pandas_df = pd.read_csv('file.csv') # assuming the file contains a header # pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header s_df = sql_sc.createDataFrame(pandas_df)
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|"); print(df.collect())
Это соответствует тому, что JP Mercier первоначально предложил об использовании панд, но с серьезной модификацией: если Вы читаете данные в панд кусками, он должен быть более податливым. Это означает, что вы можете разобрать гораздо больший файл, чем панды могут на самом деле обрабатывать как один кусок и передать его в Spark меньших размеров. (Это также отвечает на комментарий о том, почему нужно использовать Spark, если они могут загружать все в панды в любом случае.)
from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd sc = SparkContext('local','example') # if using locally sql_sc = SQLContext(sc) Spark_Full = sc.emptyRDD() chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000) # if you have headers in your csv file: headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns) for chunky in chunk_100k: Spark_Full += sc.parallelize(chunky.values.tolist()) YourSparkDataFrame = Spark_Full.toDF(headers) # if you do not have headers, leave empty instead: # YourSparkDataFrame = Spark_Full.toDF() YourSparkDataFrame.show()
Теперь есть еще один вариант для любого общего файла csv:https://github.com/seahboonsiew/pyspark-csv следующим образом:
предположим, что у нас есть следующий контекст
sc = SparkContext sqlCtx = SQLContext or HiveContextво-первых, распределить pyspark-csv.py исполнителям, использующим SparkContext
import pyspark_csv as pycsv sc.addPyFile('pyspark_csv.py')чтение данных в формате CSV через SparkContext и преобразовать его в таблицу данных
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv') dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
Если ваши данные csv не содержат новых строк ни в одном из полей, вы можете загрузить свои данные с помощью
textFile()и разобрать егоimport csv import StringIO def loadRecord(line): input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name1", "name2"]) return reader.next() input = sc.textFile(inputFile).map(loadRecord)
Если вы хотите загрузить CSV в таблицы данных, то вы можете сделать следующее:
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format('com.databricks.spark.csv') \ .options(header='true', inferschema='true') \ .load('sampleFile.csv') # this is your csv fileон отлично работал для меня.
Comments