В Python многопроцессорных бассейн.карта для нескольких аргументов



в многопроцессорной библиотеке Python есть вариант пула.карта, которая поддерживает несколько аргументов?



text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)

if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
500   15  

15 ответов:

ответ на это зависит от версии и ситуации. Самый общий ответ для последних версий Python (начиная с 3.3) был впервые описан ниже J. F. Sebastian.1 использует Pool.starmap метод, который принимает последовательность кортежей аргументов. Затем он автоматически распаковывает аргументы из каждого кортежа и передает их в заданную функцию:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

для более ранних версий Python вам нужно будет написать помощника функция для распаковки аргументов явно. Если вы хотите использовать with, вам также нужно написать обертку, чтобы повернуть Pool в контексте менеджера. (Спасибо мюонных для указания на это.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

в более простых случаях, с фиксированным вторым аргументом, вы также можете использовать partial, но только в Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Многое из этого было вдохновлено его ответом, который, вероятно, должен был быть принят вместо этого. Но так как этот застрял на в верхней части, казалось, лучше всего улучшить его для будущих читателей.

есть ли вариант пула.карта, которая поддерживает несколько аргументов?

в Python 3.3 включает в себя pool.starmap() метод:

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

для более старых версий:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

выход

1 1
2 1
3 1

обратите внимание, как itertools.izip() и itertools.repeat() используются здесь.

из-за ошибка, упомянутая @unutbu вы не можете использовать functools.partial() или похожие возможности на Python 2.6, поэтому простая функция-оболочка func_star() должно быть определено явно. Смотрите также решениепредложил uptimebox.

Я думаю, что ниже будет лучше

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

выход

[3, 5, 7]

используя Python 3.3+ С pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

результат:

1 --- 4
2 --- 5
3 --- 6

вы также можете zip() больше аргументов, если вам нравится: zip(a,b,c,d,e)

если вы хотите, чтобы постоянное значение передавалось в качестве аргумента, вы должны использовать import itertools а то zip(itertools.repeat(constant), a) например.

узнав об itertools в J. F. Sebastian ответ я решил сделать еще один шаг и написать parmap пакет, который заботится о распараллеливании, предлагая map и starmap функции на python-2.7 и python-3.2 (и позже также), которые могут принимать любое число позиционных аргументов.

установка

pip install parmap

как распараллелить:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

Я загрузил parmap в PyPI и в a репозиторий github.

в качестве примера на вопрос можно ответить следующим образом:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)

там вилка multiprocessing под названием пафос (Примечание: используйте версию на GitHub), что не нужно starmap -- функции map отражают API для карты python, поэтому map может принимать несколько аргументов. С pathos, вы также можете сделать многопроцессорную обработку в интерпретаторе, вместо того, чтобы застрять в __main__ блок. Пафос должен быть выпущен после некоторого мягкого обновления - в основном преобразования в python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

вы можете использовать следующие две функции, чтобы избежать написания обертки для каждой новой функции:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

использование функции function со списками аргументов arg_0,arg_1 и arg_2 следующим образом:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()

лучший способ-это использование декоратор вместо того, чтобы писать функции-оболочки вручную. Особенно, когда у вас есть много функций для отображения, декоратор сэкономит ваше время, избегая написания обертки для каждой функции. Обычно украшенная функция не является picklable, однако мы можем использовать functools чтобы обойти его. Больше дисскусий можно найти здесь.

вот пример

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

тогда вы можете сопоставить его с молнией аргументы

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

конечно, вы всегда можете использовать Pool.starmap в Python 3 (>=3.3), Как упоминалось в других ответах.

другой простой альтернативой является обернуть параметры функции в кортеж, а затем обернуть параметры, которые должны быть переданы в кортежах, а также. Это, возможно, не идеально при работе с большими кусками данных. Я считаю, что он будет делать копии для каждого кортежа.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

дает выход в некотором произвольном порядке:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

лучшее решение для python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

out []:

[3, 5, 7]

другой способ-передать список списков в процедуру с одним аргументом:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

можно построить список списков аргументов с помощью своего любимого метода.

из python 3.4.4, вы можете использовать многопроцессорную обработку.get_context () чтобы получить объект контекста для использования нескольких методов запуска:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

или вы просто заменить

pool.map(harvester(text,case),case, 1)

by:

pool.apply_async(harvester(text,case),case, 1)

# "Как принять несколько аргументов".

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)

в официальной документации указано, что он поддерживает только один аргумент типа Iterable. Мне нравится использовать apply_async в таких случаях. В вашем случае я бы сделал:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()

для python2, вы можете использовать этот трюк

def fun(a,b):
    return a+b

pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))

Comments

    Ничего не найдено.