Вложенный параллелизм в многопроцессорной системе Python
Я знаю, что это звучит как что-то, о чем спрашивали раньше, но подождите, я объясню, почему другие варианты не работают.
В настоящее время я использую
multiprocessing.Pool для реализации параллелизма в приложении и хотел бы расширить его, чтобы иметь возможность использовать вложенный параллелизм. Наивный подход простой передачи Объекта Pool в качестве аргумента apply_async не работает , Как отмечалось в других ответах, потому что Pool нельзя замариновать.Вот мои требования:
Мне нужен какой-то пул, чтобы ограничить количество одновременно выполняемых задач. Например,
multiprocess.Poolслужит этим целям, за исключением того, что он не может быть передан другим процессам.
Мне нужен вложенный параллелизм. В моем приложении мне нужно выполнить ввод-вывод, чтобы определить, что такое вложенная работа, поэтому я абсолютно не хочу делать это из одного потока. Я думаю, что это исключает все ответы на этот вопрос.
Он нуждается чтобы быть в стандартной библиотеке; я не могу добавлять зависимости. Это исключает этот ответ.
Я бы очень хотел, чтобы он работал как с Python 2, так и с Python 3. Однако, если бы можно было показать, что переход на Python 3 решит мою проблему, я бы рассмотрел ее.
Мне не нужно это специально для использования нескольких процессов, было бы нормально использовать потоки, потому что большая часть работы-это ввод-вывод или ожидание завершения подпроцессов.
Я пробовал использовать multiprocessing.dummy, это тот же интерфейс, но реализованный поверх threading. Однако, когда я пытаюсь вызвать get() для получения результатов моих тестов, я получаю следующую ошибку, поэтому я думаю, что это не так.
File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
ValueError: signal only works in main thread
Я знаю о библиотеке concurrent.futures в Python 3, но это, кажется, имеет некоторые серьезные ограничения. Например, второй пример в этом разделе, по-видимому, является пробкой шоу в моем дело:
Https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
Я не вижу, как вы могли бы избежать этого с помощью любого прямо написанного вложенного параллельного алгоритма. Поэтому, даже если бы я был готов использовать Python 3, я думаю, что это не стартер.
Я не знаю о каких-либо других вариантах, доступных в стандартной библиотеке, без написания моей собственной реализации.
1 ответ:
Вы, кажется, исключили это, но я подозреваю, что https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor , или https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor будет работать, если вы можете перейти на Python 3 или добавить зависимость для Python 2.
Если дополнительная работа из каждого файла не должна запускаться до тех пор, пока этот файл не будет обработан, у вас может быть один координирующий поток, который запускает все остальные, и так далее. взаимоблокировка может быть предотвращена, как в приведенном ниже примере.
В качестве альтернативы, если каждый поток на первом уровне должен инициировать работу самостоятельно, дополнительная работа может потребоваться в другом пуле, чтобы предотвратить взаимоблокировку (в зависимости от того, когдаfrom concurrent.futures import ThreadPoolExecutor import time pool = ThreadPoolExecutor(max_workers=3) def find_work_inputs(dummy_file): print("{}: Finding work...".format(dummy_file)) time.sleep(1) work = range(0, dummy_file) print("{}: Work is {}".format(dummy_file, work)) return work def do_work(dummy_file, work_input): print("{}: {}".format(dummy_file, work_input)) print("{}: Doing work {}...".format(dummy_file, work_input)) time.sleep(1) return work_input * work_input dummy_files = [1,2,3,4,5] futures = [] for dummy_file in dummy_files: work_inputs = pool.submit(find_work_inputs, dummy_file) for work_input in work_inputs.result(): result = work_input futures.append((dummy_file, result, pool.submit(do_work, dummy_file, result))) for dummy_file, work_input, future in futures: print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))result()вызывается в каждом будущем), как показано ниже.from concurrent.futures import ThreadPoolExecutor import time find_work_pool = ThreadPoolExecutor(max_workers=3) do_work_pool = ThreadPoolExecutor(max_workers=3) def find_work_inputs(dummy_file): print("{}: Finding work...".format(dummy_file)) time.sleep(1) work = range(0, dummy_file) print("{}: Work is {}".format(dummy_file, work)) futures = [] for work_input in work: futures.append((dummy_file, work_input, do_work_pool.submit(do_work, dummy_file, work_input))) return futures def do_work(dummy_file, work_input): print("{}: {}".format(dummy_file, work_input)) print("{}: Doing work {}...".format(dummy_file, work_input)) time.sleep(1) return work_input * work_input dummy_files = [1,2,3,4,5] futures = [] for dummy_file in dummy_files: futures.extend(find_work_pool.submit(find_work_inputs, dummy_file).result()) for dummy_file, work_input, future in futures: print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))
Comments