Показать ход выполнения вызова карты многопроцессорного пула Python?



у меня есть скрипт, который успешно выполняет набор задач многопроцессорного пула с imap_unordered() звоните:



p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion


мой num_tasks составляет около 250 000, и поэтому join() блокирует основной поток в течение 10 секунд или около того, и я хотел бы иметь возможность эхо в командной строке постепенно, чтобы показать, что основной процесс не заблокирован. Что-то вроде:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)


есть ли метод для объекта результата или самого пула, который указывает количество задач остальные? Я пробовал использовать multiprocessing.Value объект как счетчик (do_work называет counter.value += 1 действие после выполнения своей задачи), но счетчик получает только ~85% от общего значения, прежде чем остановить приращение.

462   6  

6 ответов:

нет необходимости обращаться к частным атрибутам результирующего набора:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

мой личный фаворит -- дает вам хороший небольшой индикатор выполнения и завершения ETA, пока все работает и фиксируется параллельно.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

нашел ответ сам с некоторыми больше копать: взглянув на __dict__ на imap_unordered результат объекта, я обнаружил, что он имеет _index атрибут, который увеличивается с каждым завершением задачи. Так что это работает для ведения журнала, завернутый в while петли:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

однако, я нашел, что замена imap_unordered на map_async привело к гораздо более быстрому выполнению, хотя объект результата немного отличается. Вместо этого объект результата из map_async есть и ready() способ:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

я обнаружил, что работа уже была сделана к тому времени, когда я попытался проверить ее прогресс. Это то, что сработало для меня с помощью tqdm.

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

Это должно работать со всеми вариантами многопроцессорной обработки, независимо от того, блокируются они или нет.

Я знаю, что это довольно старый вопрос, но вот что я делаю, когда хочу отслеживать прогрессию пула задач в python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

в основном, вы используете apply_async с callbak (в этом случае он должен добавить возвращаемое значение в список), поэтому вам не нужно ждать, чтобы сделать что-то еще. Затем, в течение некоторого времени цикла, вы проверяете ход работы. В этом случае я добавил виджет, чтобы он выглядел лучше.

в вывод:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

надеюсь, что это помогает.

Я создал пользовательский класс для создания распечатки прогресс. Мне это помогает:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

Comments

    Ничего не найдено.