Питон многопроцессорной обработки очереди трубы против
каковы фундаментальные различия между очередями и трубами в многопроцессорный пакет Python?
в каких сценариях следует выбирать один над другим? Когда это выгодно использовать Pipe()? Когда это выгодно использовать Queue()?
1 ответ:
A
Pipe()может иметь только две конечные точки.A
Queue()может иметь несколько производителей и потребителей.когда их использовать
Если вам нужно более двух точек для связи, используйте
Queue().Если вам нужна абсолютная производительность, a
Pipe()гораздо быстрее, потому чтоQueue()построен на сверхуPipe().Оценка Результатов
предположим, вы хотите создать два процесса и отправлять сообщения между ними как можно быстрее. Это временные результаты гонки сопротивления между аналогичными тестами с использованием
Pipe()иQueue()... Это на ThinkpadT61 под управлением Ubuntu 11.10, и Python 2.7.2.FYI, я бросил в результаты для
JoinableQueue()в качестве бонуса;JoinableQueue()учетные записи для задач, когдаqueue.task_done()is вызывается (он даже не знает о конкретной задаче, он просто подсчитывает незавершенные задачи в очереди), так чтоqueue.join()знает, что работа закончена.код для каждого в нижней части этого ответа...
mpenning@mpenning-T61:~$ python multi_pipe.py Sending 10000 numbers to Pipe() took 0.0369849205017 seconds Sending 100000 numbers to Pipe() took 0.328398942947 seconds Sending 1000000 numbers to Pipe() took 3.17266988754 seconds mpenning@mpenning-T61:~$ python multi_queue.py Sending 10000 numbers to Queue() took 0.105256080627 seconds Sending 100000 numbers to Queue() took 0.980564117432 seconds Sending 1000000 numbers to Queue() took 10.1611330509 seconds mpnening@mpenning-T61:~$ python multi_joinablequeue.py Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds mpenning@mpenning-T61:~$в резюме
Pipe()примерно в три раза быстрее, чемQueue(). Даже не думай оJoinableQueue()если вы действительно должны иметь преимущества.БОНУСНЫЙ МАТЕРИАЛ 2
многопроцессорная обработка вводит тонкие изменения в потоке информации, которые затрудняют отладку, если вы не знаете некоторые ярлыки. Например, у вас может быть скрипт, который отлично работает при индексации через словарь во многих условиях, но редко терпит неудачу с определенными входными данными.
обычно мы получаем подсказки к сбою, когда весь процесс python выходит из строя; однако вы не получаете незапрашиваемые трассировки сбоев, напечатанные на консоли, если функция многопроцессорной обработки выходит из строя. Отслеживание неизвестной многопроцессорной обработки аварии трудно без ключа к тому, что разбился процесс.
самый простой способ, который я нашел для отслеживания многопроцессорной информации о сбоях, - это обернуть всю многопроцессорную функцию в
try/exceptи использоватьtraceback.print_exc():import traceback def reader(args): try: # Insert stuff to be multiprocessed here return args[0]['that'] except: print "FATAL: reader({0}) exited while multiprocessing".format(args) traceback.print_exc()теперь, когда вы находите аварии вы видите что-то вроде:
FATAL: reader([{'crash', 'this'}]) exited while multiprocessing Traceback (most recent call last): File "foo.py", line 19, in __init__ self.run(task_q, result_q) File "foo.py", line 46, in run raise ValueError ValueErrorИсходный Код:
""" multi_pipe.py """ from multiprocessing import Process, Pipe import time def reader_proc(pipe): ## Read from the pipe; this will be spawned as a separate Process p_output, p_input = pipe p_input.close() # We are only reading while True: msg = p_output.recv() # Read from the output pipe and do nothing if msg=='DONE': break def writer(count, p_input): for ii in xrange(0, count): p_input.send(ii) # Write 'count' numbers into the input pipe p_input.send('DONE') if __name__=='__main__': for count in [10**4, 10**5, 10**6]: # Pipes are unidirectional with two endpoints: p_input ------> p_output p_output, p_input = Pipe() # writer() writes to p_input from _this_ process reader_p = Process(target=reader_proc, args=((p_output, p_input),)) reader_p.daemon = True reader_p.start() # Launch the reader process p_output.close() # We no longer need this part of the Pipe() _start = time.time() writer(count, p_input) # Send a lot of stuff to reader_proc() p_input.close() reader_p.join() print("Sending {0} numbers to Pipe() took {1} seconds".format(count, (time.time() - _start)))
""" multi_queue.py """ from multiprocessing import Process, Queue import time import sys def reader_proc(queue): ## Read from the queue; this will be spawned as a separate Process while True: msg = queue.get() # Read from the queue and do nothing if (msg == 'DONE'): break def writer(count, queue): ## Write to the queue for ii in range(0, count): queue.put(ii) # Write 'count' numbers into the queue queue.put('DONE') if __name__=='__main__': pqueue = Queue() # writer() writes to pqueue from _this_ process for count in [10**4, 10**5, 10**6]: ### reader_proc() reads from pqueue as a separate process reader_p = Process(target=reader_proc, args=((pqueue),)) reader_p.daemon = True reader_p.start() # Launch reader_proc() as a separate python process _start = time.time() writer(count, pqueue) # Send a lot of stuff to reader() reader_p.join() # Wait for the reader to finish print("Sending {0} numbers to Queue() took {1} seconds".format(count, (time.time() - _start)))
""" multi_joinablequeue.py """ from multiprocessing import Process, JoinableQueue import time def reader_proc(queue): ## Read from the queue; this will be spawned as a separate Process while True: msg = queue.get() # Read from the queue and do nothing queue.task_done() def writer(count, queue): for ii in xrange(0, count): queue.put(ii) # Write 'count' numbers into the queue if __name__=='__main__': for count in [10**4, 10**5, 10**6]: jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process # reader_proc() reads from jqueue as a different process... reader_p = Process(target=reader_proc, args=((jqueue),)) reader_p.daemon = True reader_p.start() # Launch the reader process _start = time.time() writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process) jqueue.join() # Wait for the reader to finish print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, (time.time() - _start)))
Comments