Показать ход выполнения вызова карты многопроцессорного пула Python?
у меня есть скрипт, который успешно выполняет набор задач многопроцессорного пула с imap_unordered() звоните:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
мой
num_tasks составляет около 250 000, и поэтому join() блокирует основной поток в течение 10 секунд или около того, и я хотел бы иметь возможность эхо в командной строке постепенно, чтобы показать, что основной процесс не заблокирован. Что-то вроде:p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
есть ли метод для объекта результата или самого пула, который указывает количество задач остальные? Я пробовал использовать multiprocessing.Value объект как счетчик (do_work называет counter.value += 1 действие после выполнения своей задачи), но счетчик получает только ~85% от общего значения, прежде чем остановить приращение.
6 ответов:
нет необходимости обращаться к частным атрибутам результирующего набора:
from __future__ import division import sys for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1): sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
мой личный фаворит -- дает вам хороший небольшой индикатор выполнения и завершения ETA, пока все работает и фиксируется параллельно.
from multiprocessing import Pool import tqdm pool = Pool(processes=8) for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)): pass
нашел ответ сам с некоторыми больше копать: взглянув на
__dict__наimap_unorderedрезультат объекта, я обнаружил, что он имеет_indexатрибут, который увеличивается с каждым завершением задачи. Так что это работает для ведения журнала, завернутый вwhileпетли:p = multiprocessing.Pool() rs = p.imap_unordered(do_work, xrange(num_tasks)) p.close() # No more work while (True): completed = rs._index if (completed == num_tasks): break print "Waiting for", num_tasks-completed, "tasks to complete..." time.sleep(2)однако, я нашел, что замена
imap_unorderedнаmap_asyncпривело к гораздо более быстрому выполнению, хотя объект результата немного отличается. Вместо этого объект результата изmap_asyncесть иready()способ:p = multiprocessing.Pool() rs = p.map_async(do_work, xrange(num_tasks)) p.close() # No more work while (True): if (rs.ready()): break remaining = rs._number_left print "Waiting for", remaining, "tasks to complete..." time.sleep(0.5)
я обнаружил, что работа уже была сделана к тому времени, когда я попытался проверить ее прогресс. Это то, что сработало для меня с помощью tqdm.
pip install tqdmfrom multiprocessing import Pool from tqdm import tqdm tasks = range(5) pool = Pool() pbar = tqdm(total=len(tasks)) def do_work(x): # do something with x pbar.update(1) pool.imap_unordered(do_work, tasks) pool.close() pool.join() pbar.close()Это должно работать со всеми вариантами многопроцессорной обработки, независимо от того, блокируются они или нет.
Я знаю, что это довольно старый вопрос, но вот что я делаю, когда хочу отслеживать прогрессию пула задач в python.
from progressbar import ProgressBar, SimpleProgress import multiprocessing as mp from time import sleep def my_function(letter): sleep(2) return letter+letter dummy_args = ["A", "B", "C", "D"] pool = mp.Pool(processes=2) results = [] pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start() r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args] while len(results) != len(dummy_args): pbar.update(len(results)) sleep(0.5) pbar.finish() print resultsв основном, вы используете apply_async с callbak (в этом случае он должен добавить возвращаемое значение в список), поэтому вам не нужно ждать, чтобы сделать что-то еще. Затем, в течение некоторого времени цикла, вы проверяете ход работы. В этом случае я добавил виджет, чтобы он выглядел лучше.
в вывод:
4 of 4 ['AA', 'BB', 'CC', 'DD']надеюсь, что это помогает.
Я создал пользовательский класс для создания распечатки прогресс. Мне это помогает:
from multiprocessing import Pool, cpu_count class ParallelSim(object): def __init__(self, processes=cpu_count()): self.pool = Pool(processes=processes) self.total_processes = 0 self.completed_processes = 0 self.results = [] def add(self, func, args): self.pool.apply_async(func=func, args=args, callback=self.complete) self.total_processes += 1 def complete(self, result): self.results.extend(result) self.completed_processes += 1 print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100)) def run(self): self.pool.close() self.pool.join() def get_results(self): return self.results
Comments