Как повторно использовать пул процессов для параллельного программирования в Python 3



Я новичок в парраллельном программировании. Моя задача-проанализировать сотни файлов данных. Каждая из этих данных составляет почти 300 МБ и может быть разрезана на множество кусочков. Мой компьютер-это 4-ядерный ПК. И я хочу получить результат по каждому из этих данных как можно скорее.

Анализ каждого файла данных состоит из 2 процедур. Во-первых, считывать данные в память, а затем нарезать их на кусочки, что является интенсивной работой io. Затем сделайте много вычислений для срезов этого файла, что является интенсивным процессором.

Так моя стратегия-сгруппировать эти файлы в группу 4. Для каждой группы этих файлов сначала считываются все данные 4 файлов в память с 4 процессами в 4 ядрах. Код такой:



with Pool(processes=4) as pool:
data_list = pool.map(read_and_slice, files) # len(files)==4


Затем для каждого data в data_list Выполните вычислительную работу с 4 процессами.



for data in data_list:  # I want to get the result of each data asap
with Pool(processes=4) as pool:
result_list = pool.map(compute, data.slices) # anaylyze each slice of data
analyze(result_list) # analyze the results of previous procedure, for example, get the average.


А затем переходите к другой группе.

Таким образом, проблема заключается в том, что в течение всего процесса вычисления сотен файлов пул воссоздается много раз. Как я мог избежать накладных расходов на воссоздание пулов и процессов? Является в моем коде есть какие-то существенные накладные расходы памяти? И есть ли лучший способ для меня, чтобы сделать время, необходимое как можно меньше?



Спасибо!

551   1  

1 ответ:

Одним из вариантов является, чтобы переместить with Pool заявление за пределами for петля...

p = Pool()
for data in data_list:
  result_list = pool.map(compute, data.slices)
  analyze(result_list)
p.join()
p.close()

Это работает в python 2 или 3.

Если вы установите (мой модуль) pathos, а затем выполните from pathos.pools import ProcessPool as Pool и сохраните остальную часть кода точно так же, как у вас есть-вы создадите только один Pool. Это происходит потому, что pathos кэширует Pool, и когда создается новый экземпляр Pool, имеющий ту же конфигурацию, он просто повторно использует существующий экземпляр. Вы можете сделать pool.terminate(), чтобы закрыть его.

>>> from pathos.pools import ProcessPool as Pool
>>> pool = Pool()
>>> data_list = [range(4), range(4,8), range(8,12), range(12,16)]
>>> squared = lambda x:x**2
>>> mean = lambda x: sum(x)/len(x)
>>> for data in data_list:
...   result = pool.map(squared, data)
...   print mean(result)
... 
3
31
91
183

На самом деле, pathos позволяет создавать вложенные пулы, поэтому вы также можете преобразовать цикл for в асинхронную карту (amap из pathos)... и поскольку внутренняя карта не нуждается в сохранении порядка, вы можете использовать неупорядоченный итератор карт (imap_unordered в multiprocessing или uimap из pathos). Примеры см. здесь: https://stackoverflow.com/questions/28203774/how-to-do-hierarchical-parallelism-in-ipython-parallel и здесь: https://stackoverflow.com/a/31617653/2379433

Только облом-это pathos есть python2. Но скоро (в ожидании релиза) будет полностью преобразован в python3.

Comments

    Ничего не найдено.