Python Chunking CSV файл мультипроцессорная обработка



Я использую следующий код для разбиения CSV-файла на несколько блоков (источник: здесь)



def worker(chunk):
print len(chunk)

def keyfunc(row):
return row[0]

def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()


Тем не менее, кажется, что количество блоков всегда остается постоянным независимо от количества блоков, которые я выбираю для использования. Например, независимо от того, выбираю ли я 1 или 10 блоков, я всегда получаю этот вывод при обработке файла образца. В идеале я хотел бы разделить файл так, чтобы он был равномерно распределен.

Обратите внимание, что реальный файл, который я загружаю, составляет более 13 миллионов строки длинные, поэтому я обрабатываю их по частям. Это просто необходимо!



6
7
1
...
1
1
94
--- 0.101687192917 seconds ---
668   2  

2 ответов:

За Комментарии , мы хотим, чтобы каждый процесс работал на отрезке в 10000 строк. Это не так уж трудно сделать. чтобы сделать; смотрите рецепт iter/islice ниже. Однако проблема с использованием

pool.map(worker, ten_thousand_row_chunks)

Это что pool.map попытается поместить все блоки в очередь задач сразу же . Если для этого требуется больше памяти, чем доступно, то вы получаете MemoryError. (Примечание: pool.imap страдает от той же проблемы.)

Поэтому вместо этого нам нужно вызвать pool.map итеративно, на кусочки каждого куска.

import itertools as IT
import multiprocessing as mp
import csv

def worker(chunk):
    return len(chunk)

def main():
    # num_procs is the number of workers in the pool
    num_procs = mp.cpu_count()
    # chunksize is the number of lines in a chunk
    chunksize = 10**5

    pool = mp.Pool(num_procs)
    largefile = 'Counseling.csv'
    results = []
    with open(largefile, 'rb') as f:
        reader = csv.reader(f)
        for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
            chunk = iter(chunk)
            pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
            result = pool.map(worker, pieces)
            results.extend(result)
    print(results)
    pool.close()
    pool.join()

main()

Каждый chunk будет состоять из до chunksize*num_procs строк из файла. Это достаточные данные, чтобы дать всем работникам в пуле что-то для работы, но не слишком большие, чтобы вызвать ошибку памяти-при условии, что chunksize не установлен слишком большой.

Каждый chunk затем разбивается на части, причем каждая часть состоит из до chunksize строки из файла. Эти части затем отправляются в pool.map.


Как работает iter(lambda: list(IT.islice(iterator, chunksize)), []) :

Это идиома для группировки итератора в блоки длины chunksize. Давайте посмотрим, как это работает на примере:

In [111]: iterator = iter(range(10))
Обратите внимание, что каждый раз, когда вызывается IT.islice(iterator, 3), новый кусок из 3 элементов отсекается от итератора:
In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]

In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]

In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]

Когда в итераторе остается менее 3 элементов, возвращается только то, что осталось:

In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]

И если вы вызовете его снова, вы получите пустой список:

In [116]: list(IT.islice(iterable, 3))
Out[116]: []

lambda: list(IT.islice(iterator, chunksize)) является функцией, которая возвращает list(IT.islice(iterator, chunksize)) при вызове. Это "ОДН-вкладыш" который эквивалентно

def func():
    return  list(IT.islice(iterator, chunksize))

Наконец, iter(callable, sentinel) возвращает другой итератор. Значения, полученные этим итератором, являются значениями, возвращаемыми вызываемым объектом. Он продолжает выдавать значения до тех пор, пока вызываемый объект не возвращает значение, равное часовому. Итак

iter(lambda: list(IT.islice(iterator, chunksize)), [])

Будет продолжать возвращать значения list(IT.islice(iterator, chunksize)), пока это значение не станет пустым списком:

In [121]: iterator = iter(range(10))

In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

Прежде всего itertools.groupby не будет иметь никакого реального смысла, если записи еще не отсортированы по ключевому столбцу. Более того , если вы требуете просто разбить csv-файл на заданное количество строк и передать его работнику, то вам не нужно делать все это.

Простая реализация будет:

import csv
from multiprocessing import Pool


def worker(chunk):
    print len(chunk)

def emit_chunks(chunk_size, file_path):
    lines_count = 0
    with open(file_path) as f:
        reader = csv.reader(f)
        chunk = []
        for line in reader:
            lines_count += 1
            chunk.append(line)
            if lines_count == chunk_size:
                lines_count = 0
                yield chunk
                chunk = []
            else:
                continue
        if chunk : yield chunk

def main():
    chunk_size = 10
    gen = emit_chunks(chunk_size, 'c:/Temp/in.csv')
    p = Pool(5)
    p.imap(worker, gen)
    print 'Completed..'

*редактирование: изменить в бассейн.IMAP вместо бассейна.карта

Comments

    Ничего не найдено.