Загрузите CSV-файл с помощью Spark



Я новичок в Spark, и я пытаюсь прочитать данные CSV из файла с Spark.
Вот что я делаю :



sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()


Я ожидал бы, что этот вызов даст мне список из двух первых столбцов моего файла, но я получаю эту ошибку:



File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range


хотя мой CSV-файл в более чем один столбец.

695   10  

10 ответов:

вы уверены, что все строки имеют не менее 2 столбцов? Вы можете попробовать что-то вроде, просто проверить?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

кроме того, вы можете распечатать виновника (если таковые имеются):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()

Искра 2.0.0+

вы можете использовать встроенный источник данных csv напрямую:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

или

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

без каких-либо внешних зависимостей.

Искра :

вместо ручного разбора, который далек от тривиального в общем случае, я бы рекомендовал spark-csv:

убедитесь, что Spark CSV включен в путь (--packages,--jars, --driver-class-path)

и загрузите ваши данные следующим образом:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

он может обрабатывать загрузку, вывод схемы, удаление искаженных строк и не требует передачи данных из Python в JVM.

Примечание:

если вы знаете схему, лучше избежать вывода схемы и передать его в DataFrameReader. Предполагая, что у вас есть три столбца-integer, double и string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

простое разбиение на запятые также разделит запятые, которые находятся в полях (например,a,b,"1,2,3",c), Так что это не рекомендуется. ответ zero323 хорошо, если вы хотите использовать API DataFrames, но если вы хотите придерживаться базовой Spark, вы можете анализировать CSV в базовом Python с помощью csv модуль:

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDIT: Как упоминалось в комментариях @muon, это будет относиться к заголовку, как и к любой другой строке, поэтому вам нужно будет извлечь его вручную. Например, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (сделать конечно, не изменять header прежде чем фильтр оценит). Но на данный момент вам, вероятно, лучше использовать встроенный CSV-парсер.

и еще один вариант, который состоит в чтении CSV-файла с помощью Pandas, а затем импортирует фрейм данных Pandas в Spark.

например:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|");

print(df.collect())

Это соответствует тому, что JP Mercier первоначально предложил об использовании панд, но с серьезной модификацией: если Вы читаете данные в панд кусками, он должен быть более податливым. Это означает, что вы можете разобрать гораздо больший файл, чем панды могут на самом деле обрабатывать как один кусок и передать его в Spark меньших размеров. (Это также отвечает на комментарий о том, почему нужно использовать Spark, если они могут загружать все в панды в любом случае.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()

Теперь есть еще один вариант для любого общего файла csv:https://github.com/seahboonsiew/pyspark-csv следующим образом:

предположим, что у нас есть следующий контекст

sc = SparkContext
sqlCtx = SQLContext or HiveContext

во-первых, распределить pyspark-csv.py исполнителям, использующим SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

чтение данных в формате CSV через SparkContext и преобразовать его в таблицу данных

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)

Если ваши данные csv не содержат новых строк ни в одном из полей, вы можете загрузить свои данные с помощью textFile() и разобрать его

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)

Если вы хотите загрузить CSV в таблицы данных, то вы можете сделать следующее:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

он отлично работал для меня.

import pandas as pd

data1 = pd.read_csv("test1.csv")
data2 = pd.read_csv("train1.csv")

Comments

    Ничего не найдено.