Python Chunking CSV файл мультипроцессорная обработка
Я использую следующий код для разбиения CSV-файла на несколько блоков (источник: здесь)
def worker(chunk):
print len(chunk)
def keyfunc(row):
return row[0]
def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
Тем не менее, кажется, что количество блоков всегда остается постоянным независимо от количества блоков, которые я выбираю для использования. Например, независимо от того, выбираю ли я 1 или 10 блоков, я всегда получаю этот вывод при обработке файла образца. В идеале я хотел бы разделить файл так, чтобы он был равномерно распределен.
Обратите внимание, что реальный файл, который я загружаю, составляет более 13 миллионов строки длинные, поэтому я обрабатываю их по частям. Это просто необходимо!
6
7
1
...
1
1
94
--- 0.101687192917 seconds ---
2 ответов:
За Комментарии , мы хотим, чтобы каждый процесс работал на отрезке в 10000 строк. Это не так уж трудно сделать. чтобы сделать; смотрите рецепт
iter/isliceниже. Однако проблема с использованиемpool.map(worker, ten_thousand_row_chunks)Это что
pool.mapпопытается поместить все блоки в очередь задач сразу же . Если для этого требуется больше памяти, чем доступно, то вы получаетеMemoryError. (Примечание:pool.imapстрадает от той же проблемы.)Поэтому вместо этого нам нужно вызвать
pool.mapитеративно, на кусочки каждого куска.import itertools as IT import multiprocessing as mp import csv def worker(chunk): return len(chunk) def main(): # num_procs is the number of workers in the pool num_procs = mp.cpu_count() # chunksize is the number of lines in a chunk chunksize = 10**5 pool = mp.Pool(num_procs) largefile = 'Counseling.csv' results = [] with open(largefile, 'rb') as f: reader = csv.reader(f) for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []): chunk = iter(chunk) pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), [])) result = pool.map(worker, pieces) results.extend(result) print(results) pool.close() pool.join() main()Каждый
chunkбудет состоять из доchunksize*num_procsстрок из файла. Это достаточные данные, чтобы дать всем работникам в пуле что-то для работы, но не слишком большие, чтобы вызвать ошибку памяти-при условии, чтоchunksizeне установлен слишком большой.Каждый
chunkзатем разбивается на части, причем каждая часть состоит из доchunksizeстроки из файла. Эти части затем отправляются вpool.map.
Как работает
iter(lambda: list(IT.islice(iterator, chunksize)), []):Это идиома для группировки итератора в блоки длины chunksize. Давайте посмотрим, как это работает на примере:
Обратите внимание, что каждый раз, когда вызываетсяIn [111]: iterator = iter(range(10))IT.islice(iterator, 3), новый кусок из 3 элементов отсекается от итератора:In [112]: list(IT.islice(iterator, 3)) Out[112]: [0, 1, 2] In [113]: list(IT.islice(iterator, 3)) Out[113]: [3, 4, 5] In [114]: list(IT.islice(iterator, 3)) Out[114]: [6, 7, 8]Когда в итераторе остается менее 3 элементов, возвращается только то, что осталось:
In [115]: list(IT.islice(iterator, 3)) Out[115]: [9]И если вы вызовете его снова, вы получите пустой список:
In [116]: list(IT.islice(iterable, 3)) Out[116]: []
lambda: list(IT.islice(iterator, chunksize))является функцией, которая возвращаетlist(IT.islice(iterator, chunksize))при вызове. Это "ОДН-вкладыш" который эквивалентноdef func(): return list(IT.islice(iterator, chunksize))Наконец,
iter(callable, sentinel)возвращает другой итератор. Значения, полученные этим итератором, являются значениями, возвращаемыми вызываемым объектом. Он продолжает выдавать значения до тех пор, пока вызываемый объект не возвращает значение, равное часовому. Итакiter(lambda: list(IT.islice(iterator, chunksize)), [])Будет продолжать возвращать значения
list(IT.islice(iterator, chunksize)), пока это значение не станет пустым списком:In [121]: iterator = iter(range(10)) In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), [])) Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Прежде всего itertools.groupby не будет иметь никакого реального смысла, если записи еще не отсортированы по ключевому столбцу. Более того , если вы требуете просто разбить csv-файл на заданное количество строк и передать его работнику, то вам не нужно делать все это.
Простая реализация будет:
import csv from multiprocessing import Pool def worker(chunk): print len(chunk) def emit_chunks(chunk_size, file_path): lines_count = 0 with open(file_path) as f: reader = csv.reader(f) chunk = [] for line in reader: lines_count += 1 chunk.append(line) if lines_count == chunk_size: lines_count = 0 yield chunk chunk = [] else: continue if chunk : yield chunk def main(): chunk_size = 10 gen = emit_chunks(chunk_size, 'c:/Temp/in.csv') p = Pool(5) p.imap(worker, gen) print 'Completed..'*редактирование: изменить в бассейн.IMAP вместо бассейна.карта
Comments