Пул потоков похож на Многопроцессорный пул?
есть класс, бассейн для рабочего threads, аналогично многопроцессорному модулю класс, бассейн?
Я, например, простой способ распараллелить функцию map
def long_running_func(p):
c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
однако я хотел бы сделать это без накладных расходов на создание новых процессов.
Я знаю о Гиле. Однако в моем usecase функция будет связана с IO функцией C, для которой оболочка python выпустит GIL до фактического вызов функции.
должен ли я написать свой собственный пул потоков?
9 ответов:
Я только что узнал, что есть на самом деле и интерфейс пула на основе потоков в
multiprocessingмодуль, однако он скрыт несколько и не должным образом документирован.его можно импортировать через
from multiprocessing.pool import ThreadPoolон реализован с использованием фиктивного класса процесса, обертывающего поток python. Этот потоковый класс процесса можно найти в
multiprocessing.dummyкоторый кратко упоминается в docs. Этот фиктивный модуль предположительно обеспечивает весь многопроцессорный интерфейс на основе потоков.
в Python 3 Вы можете использовать
concurrent.futures.ThreadPoolExecutor, например:executor = ThreadPoolExecutor(max_workers=10) a = executor.submit(my_function)посмотреть docs для получения дополнительной информации и примеров.
да, и он, кажется, имеет (более или менее) тот же API.
import multiprocessing def worker(lnk): .... def start_process(): ..... .... if(PROCESS): pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process) else: pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, initializer=start_process) pool.map(worker, inputs) ....
для чего-то очень простого и легкого (немного изменено от здесь):
from Queue import Queue from threading import Thread class Worker(Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception, e: print e finally: self.tasks.task_done() class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) def add_task(self, func, *args, **kargs): """Add a task to the queue""" self.tasks.put((func, args, kargs)) def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() if __name__ == '__main__': from random import randrange from time import sleep delays = [randrange(1, 10) for i in range(100)] def wait_delay(d): print 'sleeping for (%d)sec' % d sleep(d) pool = ThreadPool(20) for i, d in enumerate(delays): pool.add_task(wait_delay, d) pool.wait_completion()для поддержки обратных вызовов при завершении задачи вы можете просто добавить обратный вызов в кортеж задач.
вот что выглядит многообещающим в Поваренной книге Python:
рецепт 576519: пул потоков с тем же API, что и (multi)обработка.Бассейн (В Python)
Hi, чтобы использовать пул потоков в Python вы можете использовать эту библиотеку :
from multiprocessing.dummy import Pool as ThreadPoolи затем для использования, эта библиотека сделать так:
pool = ThreadPool(threads) results = pool.map(service, tasks) pool.close() pool.join() return resultsпотоки-это количество потоков, которые вы хотите, а задачи-это список задач, которые больше всего сопоставляются с сервисом.
накладные расходы на создание новых процессов минимальны, особенно когда их всего 4. Я сомневаюсь, что это горячая точка производительности вашего приложения. Держите его простым, оптимизируйте, где вы должны и где результаты профилирования указывают.
нет встроенного пула на основе потоков. Тем не менее, это может быть очень быстро реализовать очередь производителя/потребителя с
Queueкласса.от: https://docs.python.org/2/library/queue.html
from threading import Thread from Queue import Queue def worker(): while True: item = q.get() do_work(item) q.task_done() q = Queue() for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item) q.join() # block until all tasks are done
вот результат, который я наконец - то использовал. Это модифицированная версия классов от dgorissen выше.
File:
threadpool.pyfrom queue import Queue, Empty import threading from threading import Thread class Worker(Thread): _TIMEOUT = 2 """ Thread executing tasks from a given tasks queue. Thread is signalable, to exit """ def __init__(self, tasks, th_num): Thread.__init__(self) self.tasks = tasks self.daemon, self.th_num = True, th_num self.done = threading.Event() self.start() def run(self): while not self.done.is_set(): try: func, args, kwargs = self.tasks.get(block=True, timeout=self._TIMEOUT) try: func(*args, **kwargs) except Exception as e: print(e) finally: self.tasks.task_done() except Empty as e: pass return def signal_exit(self): """ Signal to thread to exit """ self.done.set() class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads, tasks=[]): self.tasks = Queue(num_threads) self.workers = [] self.done = False self._init_workers(num_threads) for task in tasks: self.tasks.put(task) def _init_workers(self, num_threads): for i in range(num_threads): self.workers.append(Worker(self.tasks, i)) def add_task(self, func, *args, **kwargs): """Add a task to the queue""" self.tasks.put((func, args, kwargs)) def _close_all_threads(self): """ Signal all threads to exit and lose the references to them """ for workr in self.workers: workr.signal_exit() self.workers = [] def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() def __del__(self): self._close_all_threads() def create_task(func, *args, **kwargs): return (func, args, kwargs)бассейн
from random import randrange from time import sleep delays = [randrange(1, 10) for i in range(30)] def wait_delay(d): print('sleeping for (%d)sec' % d) sleep(d) pool = ThreadPool(20) for i, d in enumerate(delays): pool.add_task(wait_delay, d) pool.wait_completion()
Comments