Вложенный параллелизм в многопроцессорной системе Python



Я знаю, что это звучит как что-то, о чем спрашивали раньше, но подождите, я объясню, почему другие варианты не работают.



В настоящее время я использую multiprocessing.Pool для реализации параллелизма в приложении и хотел бы расширить его, чтобы иметь возможность использовать вложенный параллелизм. Наивный подход простой передачи Объекта Pool в качестве аргумента apply_async не работает , Как отмечалось в других ответах, потому что Pool нельзя замариновать.

Вот мои требования:





  1. Мне нужен какой-то пул, чтобы ограничить количество одновременно выполняемых задач. Например, multiprocess.Pool служит этим целям, за исключением того, что он не может быть передан другим процессам.



  2. Мне нужен вложенный параллелизм. В моем приложении мне нужно выполнить ввод-вывод, чтобы определить, что такое вложенная работа, поэтому я абсолютно не хочу делать это из одного потока. Я думаю, что это исключает все ответы на этот вопрос.



  3. Он нуждается чтобы быть в стандартной библиотеке; я не могу добавлять зависимости. Это исключает этот ответ.



  4. Я бы очень хотел, чтобы он работал как с Python 2, так и с Python 3. Однако, если бы можно было показать, что переход на Python 3 решит мою проблему, я бы рассмотрел ее.



Мне не нужно это специально для использования нескольких процессов, было бы нормально использовать потоки, потому что большая часть работы-это ввод-вывод или ожидание завершения подпроцессов.



Я пробовал использовать multiprocessing.dummy, это тот же интерфейс, но реализованный поверх threading. Однако, когда я пытаюсь вызвать get() для получения результатов моих тестов, я получаю следующую ошибку, поэтому я думаю, что это не так.



  File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
ValueError: signal only works in main thread


Я знаю о библиотеке concurrent.futures в Python 3, но это, кажется, имеет некоторые серьезные ограничения. Например, второй пример в этом разделе, по-видимому, является пробкой шоу в моем дело:



Https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor



Я не вижу, как вы могли бы избежать этого с помощью любого прямо написанного вложенного параллельного алгоритма. Поэтому, даже если бы я был готов использовать Python 3, я думаю, что это не стартер.



Я не знаю о каких-либо других вариантах, доступных в стандартной библиотеке, без написания моей собственной реализации.

573   1  

1 ответ:

Вы, кажется, исключили это, но я подозреваю, что https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor , или https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor будет работать, если вы можете перейти на Python 3 или добавить зависимость для Python 2.

Если дополнительная работа из каждого файла не должна запускаться до тех пор, пока этот файл не будет обработан, у вас может быть один координирующий поток, который запускает все остальные, и так далее. взаимоблокировка может быть предотвращена, как в приведенном ниже примере.

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(max_workers=3)

def find_work_inputs(dummy_file):
    print("{}: Finding work...".format(dummy_file))
    time.sleep(1)
    work = range(0, dummy_file)
    print("{}: Work is {}".format(dummy_file, work))
    return work

def do_work(dummy_file, work_input):
    print("{}: {}".format(dummy_file, work_input))
    print("{}: Doing work {}...".format(dummy_file, work_input))
    time.sleep(1)
    return work_input * work_input

dummy_files = [1,2,3,4,5]

futures = []
for dummy_file in dummy_files:
    work_inputs = pool.submit(find_work_inputs, dummy_file)
    for work_input in work_inputs.result():
        result = work_input
        futures.append((dummy_file, result, pool.submit(do_work, dummy_file, result)))

for dummy_file, work_input, future in futures:
    print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))
В качестве альтернативы, если каждый поток на первом уровне должен инициировать работу самостоятельно, дополнительная работа может потребоваться в другом пуле, чтобы предотвратить взаимоблокировку (в зависимости от того, когда result() вызывается в каждом будущем), как показано ниже.
from concurrent.futures import ThreadPoolExecutor
import time

find_work_pool = ThreadPoolExecutor(max_workers=3)
do_work_pool = ThreadPoolExecutor(max_workers=3)

def find_work_inputs(dummy_file):
    print("{}: Finding work...".format(dummy_file))
    time.sleep(1)
    work = range(0, dummy_file)
    print("{}: Work is {}".format(dummy_file, work))

    futures = []
    for work_input in work:
        futures.append((dummy_file, work_input, do_work_pool.submit(do_work, dummy_file, work_input)))
    return futures

def do_work(dummy_file, work_input):
    print("{}: {}".format(dummy_file, work_input))
    print("{}: Doing work {}...".format(dummy_file, work_input))
    time.sleep(1)
    return work_input * work_input

dummy_files = [1,2,3,4,5]

futures = []
for dummy_file in dummy_files:
    futures.extend(find_work_pool.submit(find_work_inputs, dummy_file).result())

for dummy_file, work_input, future in futures:
    print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))

Comments

    Ничего не найдено.