В Python многопроцессорных бассейн.карта для нескольких аргументов
в многопроцессорной библиотеке Python есть вариант пула.карта, которая поддерживает несколько аргументов?
text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
15 ответов:
ответ на это зависит от версии и ситуации. Самый общий ответ для последних версий Python (начиная с 3.3) был впервые описан ниже J. F. Sebastian.1 использует
Pool.starmapметод, который принимает последовательность кортежей аргументов. Затем он автоматически распаковывает аргументы из каждого кортежа и передает их в заданную функцию:import multiprocessing from itertools import product def merge_names(a, b): return '{} & {}'.format(a, b) if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with multiprocessing.Pool(processes=3) as pool: results = pool.starmap(merge_names, product(names, repeat=2)) print(results) # Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...для более ранних версий Python вам нужно будет написать помощника функция для распаковки аргументов явно. Если вы хотите использовать
with, вам также нужно написать обертку, чтобы повернутьPoolв контексте менеджера. (Спасибо мюонных для указания на это.)import multiprocessing from itertools import product from contextlib import contextmanager def merge_names(a, b): return '{} & {}'.format(a, b) def merge_names_unpack(args): return merge_names(*args) @contextmanager def poolcontext(*args, **kwargs): pool = multiprocessing.Pool(*args, **kwargs) yield pool pool.terminate() if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with poolcontext(processes=3) as pool: results = pool.map(merge_names_unpack, product(names, repeat=2)) print(results) # Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...в более простых случаях, с фиксированным вторым аргументом, вы также можете использовать
partial, но только в Python 2.7+.import multiprocessing from functools import partial from contextlib import contextmanager @contextmanager def poolcontext(*args, **kwargs): pool = multiprocessing.Pool(*args, **kwargs) yield pool pool.terminate() def merge_names(a, b): return '{} & {}'.format(a, b) if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with poolcontext(processes=3) as pool: results = pool.map(partial(merge_names, b='Sons'), names) print(results) # Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...1. Многое из этого было вдохновлено его ответом, который, вероятно, должен был быть принят вместо этого. Но так как этот застрял на в верхней части, казалось, лучше всего улучшить его для будущих читателей.
есть ли вариант пула.карта, которая поддерживает несколько аргументов?
в Python 3.3 включает в себя
pool.starmap()метод:#!/usr/bin/env python3 from functools import partial from itertools import repeat from multiprocessing import Pool, freeze_support def func(a, b): return a + b def main(): a_args = [1,2,3] second_arg = 1 with Pool() as pool: L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)]) M = pool.starmap(func, zip(a_args, repeat(second_arg))) N = pool.map(partial(func, b=second_arg), a_args) assert L == M == N if __name__=="__main__": freeze_support() main()для более старых версий:
#!/usr/bin/env python2 import itertools from multiprocessing import Pool, freeze_support def func(a, b): print a, b def func_star(a_b): """Convert `f([1,2])` to `f(1,2)` call.""" return func(*a_b) def main(): pool = Pool() a_args = [1,2,3] second_arg = 1 pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg))) if __name__=="__main__": freeze_support() main()выход
1 1 2 1 3 1обратите внимание, как
itertools.izip()иitertools.repeat()используются здесь.из-за ошибка, упомянутая @unutbu вы не можете использовать
functools.partial()или похожие возможности на Python 2.6, поэтому простая функция-оболочкаfunc_star()должно быть определено явно. Смотрите также решениепредложилuptimebox.
Я думаю, что ниже будет лучше
def multi_run_wrapper(args): return add(*args) def add(x,y): return x+y if __name__ == "__main__": from multiprocessing import Pool pool = Pool(4) results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)]) print resultsвыход
[3, 5, 7]
используя Python 3.3+ С
pool.starmap():from multiprocessing.dummy import Pool as ThreadPool def write(i, x): print(i, "---", x) a = ["1","2","3"] b = ["4","5","6"] pool = ThreadPool(2) pool.starmap(write, zip(a,b)) pool.close() pool.join()результат:
1 --- 4 2 --- 5 3 --- 6вы также можете zip() больше аргументов, если вам нравится:
zip(a,b,c,d,e)если вы хотите, чтобы постоянное значение передавалось в качестве аргумента, вы должны использовать
import itertoolsа тоzip(itertools.repeat(constant), a)например.
узнав об itertools в J. F. Sebastian ответ я решил сделать еще один шаг и написать
parmapпакет, который заботится о распараллеливании, предлагаяmapиstarmapфункции на python-2.7 и python-3.2 (и позже также), которые могут принимать любое число позиционных аргументов.установка
pip install parmapкак распараллелить:
import parmap # If you want to do: y = [myfunction(x, argument1, argument2) for x in mylist] # In parallel: y = parmap.map(myfunction, mylist, argument1, argument2) # If you want to do: z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist] # In parallel: z = parmap.starmap(myfunction, mylist, argument1, argument2) # If you want to do: listx = [1, 2, 3, 4, 5, 6] listy = [2, 3, 4, 5, 6, 7] param = 3.14 param2 = 42 listz = [] for (x, y) in zip(listx, listy): listz.append(myfunction(x, y, param1, param2)) # In parallel: listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)Я загрузил parmap в PyPI и в a репозиторий github.
в качестве примера на вопрос можно ответить следующим образом:
import parmap def harvester(case, text): X = case[0] text+ str(X) if __name__ == "__main__": case = RAW_DATASET # assuming this is an iterable parmap.map(harvester, case, "test", chunksize=1)
там вилка
multiprocessingпод названием пафос (Примечание: используйте версию на GitHub), что не нужноstarmap-- функции map отражают API для карты python, поэтому map может принимать несколько аргументов. Сpathos, вы также можете сделать многопроцессорную обработку в интерпретаторе, вместо того, чтобы застрять в__main__блок. Пафос должен быть выпущен после некоторого мягкого обновления - в основном преобразования в python 3.x.Python 2.7.5 (default, Sep 30 2013, 20:15:49) [GCC 4.2.1 (Apple Inc. build 5566)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> def func(a,b): ... print a,b ... >>> >>> from pathos.multiprocessing import ProcessingPool >>> pool = ProcessingPool(nodes=4) >>> pool.map(func, [1,2,3], [1,1,1]) 1 1 2 1 3 1 [None, None, None] >>> >>> # also can pickle stuff like lambdas >>> result = pool.map(lambda x: x**2, range(10)) >>> result [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> >>> # also does asynchronous map >>> result = pool.amap(pow, [1,2,3], [4,5,6]) >>> result.get() [1, 32, 729] >>> >>> # or can return a map iterator >>> result = pool.imap(pow, [1,2,3], [4,5,6]) >>> result <processing.pool.IMapIterator object at 0x110c2ffd0> >>> list(result) [1, 32, 729]
вы можете использовать следующие две функции, чтобы избежать написания обертки для каждой новой функции:
import itertools from multiprocessing import Pool def universal_worker(input_pair): function, args = input_pair return function(*args) def pool_args(function, *args): return zip(itertools.repeat(function), zip(*args))использование функции
functionсо списками аргументовarg_0,arg_1иarg_2следующим образом:pool = Pool(n_core) list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2) pool.close() pool.join()
лучший способ-это использование декоратор вместо того, чтобы писать функции-оболочки вручную. Особенно, когда у вас есть много функций для отображения, декоратор сэкономит ваше время, избегая написания обертки для каждой функции. Обычно украшенная функция не является picklable, однако мы можем использовать
functoolsчтобы обойти его. Больше дисскусий можно найти здесь.вот пример
def unpack_args(func): from functools import wraps @wraps(func) def wrapper(args): if isinstance(args, dict): return func(**args) else: return func(*args) return wrapper @unpack_args def func(x, y): return x + yтогда вы можете сопоставить его с молнией аргументы
np, xlist, ylist = 2, range(10), range(10) pool = Pool(np) res = pool.map(func, zip(xlist, ylist)) pool.close() pool.join()конечно, вы всегда можете использовать
Pool.starmapв Python 3 (>=3.3), Как упоминалось в других ответах.
другой простой альтернативой является обернуть параметры функции в кортеж, а затем обернуть параметры, которые должны быть переданы в кортежах, а также. Это, возможно, не идеально при работе с большими кусками данных. Я считаю, что он будет делать копии для каждого кортежа.
from multiprocessing import Pool def f((a,b,c,d)): print a,b,c,d return a + b + c +d if __name__ == '__main__': p = Pool(10) data = [(i+0,i+1,i+2,i+3) for i in xrange(10)] print(p.map(f, data)) p.close() p.join()дает выход в некотором произвольном порядке:
0 1 2 3 1 2 3 4 2 3 4 5 3 4 5 6 4 5 6 7 5 6 7 8 7 8 9 10 6 7 8 9 8 9 10 11 9 10 11 12 [6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
лучшее решение для python2:
from multiprocessing import Pool def func((i, (a, b))): print i, a, b return a + b pool = Pool(3) pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])2 3 4
1 2 3
0 1 2
out []:
[3, 5, 7]
другой способ-передать список списков в процедуру с одним аргументом:
import os from multiprocessing import Pool def task(args): print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1] pool = Pool() pool.map(task, [ [1,2], [3,4], [5,6], [7,8] ])можно построить список списков аргументов с помощью своего любимого метода.
из python 3.4.4, вы можете использовать многопроцессорную обработку.get_context () чтобы получить объект контекста для использования нескольких методов запуска:
import multiprocessing as mp def foo(q, h, w): q.put(h + ' ' + w) print(h + ' ' + w) if __name__ == '__main__': ctx = mp.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,'hello', 'world')) p.start() print(q.get()) p.join()или вы просто заменить
pool.map(harvester(text,case),case, 1)by:
pool.apply_async(harvester(text,case),case, 1)
# "Как принять несколько аргументов".
def f1(args): a, b, c = args[0] , args[1] , args[2] return a+b+c if __name__ == "__main__": import multiprocessing pool = multiprocessing.Pool(4) result1 = pool.map(f1, [ [1,2,3] ]) print(result1)
в официальной документации указано, что он поддерживает только один аргумент типа Iterable. Мне нравится использовать apply_async в таких случаях. В вашем случае я бы сделал:
from multiprocessing import Process, Pool, Manager text = "test" def harvester(text, case, q = None): X = case[0] res = text+ str(X) if q: q.put(res) return res def block_until(q, results_queue, until_counter=0): i = 0 while i < until_counter: results_queue.put(q.get()) i+=1 if __name__ == '__main__': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET m = Manager() q = m.Queue() results_queue = m.Queue() # when it completes results will reside in this queue blocking_process = Process(block_until, (q, results_queue, len(case))) blocking_process.start() for c in case: try: res = pool.apply_async(harvester, (text, case, q = None)) res.get(timeout=0.1) except: pass blocking_process.join()
для python2, вы можете использовать этот трюк
def fun(a,b): return a+b pool = multiprocessing.Pool(processes=6) b=233 pool.map(lambda x:fun(x,b),range(1000))
Comments