Многопроцессорная обработка: как использовать пул.сопоставление функции, определенной в классе?



когда я запускаю что-то вроде:



from multiprocessing import Pool

p = Pool(5)
def f(x):
return x*x

p.map(f, [1,2,3])


он работает нормально. Однако, поставив это как функцию класса:



class calculate(object):
def run(self):
def f(x):
return x*x

p = Pool()
return p.map(f, [1,2,3])

cl = calculate()
print cl.run()


дает мне следующую ошибку:



Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed


Я видел сообщение от Алекса Мартелли, занимающегося той же проблемой, но оно было недостаточно явным.

538   14  

14 ответов:

меня также раздражали ограничения на то, какой пул функций.карту можно было принять. Я написал следующее, чтобы обойти это. Это, кажется, работает, даже для рекурсивного использования parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe,x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p,c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))

Я не мог использовать коды, опубликованные до сих пор, потому что коды, использующие "многопроцессорность.Пул "не работает с лямбда-выражениями и кодами, не использующими" многопроцессорность.Пул " порождает столько процессов, сколько есть рабочих элементов.

я адаптировал код s. t.он порождает предопределенное количество рабочих и только повторяет входной список, если существует простой рабочий. Я также включил режим "демон" для рабочих S.T. ctrl-c работает так, как ожидалось.

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))

в настоящее время нет решения вашей проблемы, насколько я знаю: функция, которую вы даете map() должен быть доступен через импорт свой модуль. Вот почему код Роберта работает: функция f() можно получить, импортировав следующий код:

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

Я на самом деле добавил "основной" раздел, потому что это следует за рекомендации для платформы Windows ("убедитесь, что основной модуль можно безопасно импортировать с помощью нового Python переводчик не вызывая побочные эффекты").

Я также добавил заглавную букву перед Calculate, чтобы следовать PEP 8. :)

многопроцессорная обработка и травление нарушены и ограничены, если вы не выходите за пределы стандартной библиотеки.

если вы используете вилку multiprocessing под названием pathos.multiprocesssing, вы можете напрямую использовать классы и методы класса в многопроцессорной map функции. Это потому что dill вместо pickle или cPickle и dill может сериализовать почти все, что в python.

pathos.multiprocessing также обеспечивает асинхронную функцию map... и он может map функции с несколькими аргументы (например,map(math.pow, [1,2,3], [4,5,6]))

посмотреть обсуждения: что могут сделать мультипроцессор и укроп вместе?

и: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

он даже обрабатывает код, который вы написали изначально, без изменений и от интерпретатора. зачем делать что-то еще, что более хрупко и специфично для одного случая?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

получить код здесь: https://github.com/uqfoundation/pathos

и, просто, чтобы показать немного больше того, что он может сделать:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]

решение mrule является правильным, но имеет ошибку: если ребенок отправляет обратно большой объем данных, он может заполнить буфер трубы, блокируя на ребенка pipe.send(), в то время как родитель ждет ребенка, чтобы выйти на pipe.join(). Решение состоит в том, чтобы прочитать данные ребенка до join()ing ребенка. Кроме того, ребенок должен закрыть Родительский конец трубы, чтобы предотвратить взаимоблокировку. Приведенный ниже код исправляет это. Также имейте в виду, что это parmap создает один процесс на элемент в X. Более продвинутым решением является использование multiprocessing.cpu_count() разделить X в несколько кусков, а затем объединить результаты перед возвращением. Я оставляю это как упражнение для читателя, чтобы не испортить краткость приятного ответа мрула. ;)

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))

Я тоже боролся с этим. У меня были функции как члены данных класса, как упрощенный пример:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

мне нужно было использовать функцию self.f в бассейне.map () вызов из того же класса и себя.Ф не принимает Кортеж в качестве аргумента. Поскольку эта функция была встроена в класс, мне было неясно, как написать тип оболочки, предложенный другими ответами.

Я решил эту проблему с помощью другой обертке, которая принимает кортеж/список, где первый элемент-это функция, а остальные элементы-аргументы этой функции, называемые eval_func_tuple(f_args). Используя это, проблемная линия может быть заменена обратным пулом.map (eval_func_tuple, itertools.izip (itertools.повторяю(самовыдвижение.Ф), список1, список2)). Вот полный код:

файл: util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

файл: main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

работает main.py приведу [11, 22, 33]. Не стесняйтесь, чтобы улучшить это, например eval_func_tuple может также быть изменены, чтобы принять ключевые слова аргументы.

С другой стороны, в других ответах функция "parmap" может быть более эффективной для случая большего количества процессов, чем количество доступных процессоров. Я копирую отредактированную версию ниже. Это мой первый пост, и я не был уверен, что должен напрямую редактировать исходный ответ. Я также переименовал некоторые переменные.

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         

функции, определенные в классах (даже внутри функций внутри классов), на самом деле не маринуются. Однако, это работает:

def f(x):
    return x*x

class calculate(object):
    def run(self):
        p = Pool()
    return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

Я взял ответ Клауса se и aganders3 и сделал документированный модуль, который более читаем и хранится в одном файле. Вы можете просто добавить его в свой проект. Он даже имеет дополнительный индикатор выполнения !

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from http://stackoverflow.com/a/16071616/287297

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

EDIT: добавлено предложение @ alexander-mcfarlane и тестовая функция

Я знаю, что это было задано более 6 лет назад, но просто хотел добавить свое решение, так как некоторые из приведенных выше предложений кажутся ужасно сложными, но мое решение было на самом деле очень простым.

все, что мне нужно было сделать, это обернуть бассейн.map () вызов вспомогательной функции. Передача объекта класса вместе с args для метода в виде кортежа, который выглядел немного так.

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)

Я изменил метод Клауса СЕ, потому что, пока он работал для меня с небольшими списками, он зависал, когда количество элементов было ~1000 или больше. Вместо того, чтобы толкать задания по одному с None условие остановки, я загружаю входную очередь сразу и просто позволяю процессам жевать ее, пока она не опустеет.

from multiprocessing import cpu_count, Queue, Process

def apply_func(f, q_in, q_out):
    while not q_in.empty():
        i, x = q_in.get()
        q_out.put((i, f(x)))

# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
    q_in, q_out   = Queue(), Queue()
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [p.start() for p in proc]
    res = [q_out.get() for _ in sent]
    [p.join() for p in proc]

    return [x for i,x in sorted(res)]

Edit: к сожалению, теперь я сталкиваюсь с этой ошибкой в моей системе:Многопроцессорность ограничить максимальный размер очереди 32767, надеюсь обходные пути там помогут.

Я не уверен, если этот подход был принят, но работа вокруг я использую это:

from multiprocessing import Pool

t = None

def run(n):
    return t.f(n)

class Test(object):
    def __init__(self, number):
        self.number = number

    def f(self, x):
        print x * self.number

    def pool(self):
        pool = Pool(2)
        pool.map(run, range(10))

if __name__ == '__main__':
    t = Test(9)
    t.pool()
    pool = Pool(2)
    pool.map(run, range(10))

вывод должен быть:

0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81
class Calculate(object):
  # Your instance method to be executed
  def f(self, x, y):
    return x*y

if __name__ == '__main__':
  inp_list = [1,2,3]
  y = 2
  cal_obj = Calculate()
  pool = Pool(2)
  results = pool.map(lambda x: cal_obj.f(x, y), inp_list)

существует возможность, что вы хотите применить эту функцию для каждого отдельного экземпляра класса. Тогда вот решение для этого также

class Calculate(object):
  # Your instance method to be executed
  def __init__(self, x):
    self.x = x

  def f(self, y):
    return self.x*y

if __name__ == '__main__':
  inp_list = [Calculate(i) for i in range(3)]
  y = 2
  pool = Pool(2)
  results = pool.map(lambda x: x.f(y), inp_list)

вот мое решение, которое, я думаю, немного менее хакерское, чем большинство других здесь. Это похоже на ответ nightowl по.

someclasses = [MyClass(), MyClass(), MyClass()]

def method_caller(some_object, some_method='the method'):
    return getattr(some_object, some_method)()

othermethod = partial(method_caller, some_method='othermethod')

with Pool(6) as pool:
    result = pool.map(othermethod, someclasses)

от http://www.rueckstiess.net/research/snippets/show/ca1d7d90 и http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html

мы можем сделать внешнюю функцию и посеять ее с помощью класса self object:

from joblib import Parallel, delayed
def unwrap_self(arg, **kwarg):
    return square_class.square_int(*arg, **kwarg)

class square_class:
    def square_int(self, i):
        return i * i

    def run(self, num):
        results = []
        results = Parallel(n_jobs= -1, backend="threading")\
            (delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
        print(results)

или без joblib:

from multiprocessing import Pool
import time

def unwrap_self_f(arg, **kwarg):
    return C.f(*arg, **kwarg)

class C:
    def f(self, name):
        print 'hello %s,'%name
        time.sleep(5)
        print 'nice to meet you.'

    def run(self):
        pool = Pool(processes=2)
        names = ('frank', 'justin', 'osi', 'thomas')
        pool.map(unwrap_self_f, zip([self]*len(names), names))

if __name__ == '__main__':
    c = C()
    c.run()

Comments

    Ничего не найдено.