Как я должен войти при использовании многопроцессорной обработки в Python?
прямо сейчас у меня есть центральный модуль в рамках, который порождает несколько процессов с помощью Python 2.6 multiprocessing модуль. Потому что он использует multiprocessing, там на уровне модуля многопроцессорной обработки-известны журнала LOG = multiprocessing.get_logger(). Пер документы, этот регистратор имеет процесс-общие блокировки, так что вы не искажаете вещи в sys.stderr (или любой другой файловый хэндл), имея несколько процессов, записывающих его одновременно.
проблема у меня сейчас заключается в том, что другие модули в фреймворк не поддерживает многопроцессорность. Как я вижу, мне нужно, чтобы все зависимости от этого центрального модуля использовали многопроцессорное ведение журнала. Это раздражает внутри фреймворк, не говоря уже о всех клиентах фреймворка. Есть ли альтернативы, о которых я не думаю?
19 ответов:
единственный способ справиться с этим ненавязчиво - это:
- порождать каждый рабочий процесс таким образом, что его журнал переходит к другой файловый дескриптор (к диску или к трубе.) В идеале, все записи журнала должны быть помечены временем.
- ваш процесс контроллера может сделать один из следующих:
- при использовании диска: объединение файлов журнала в конце выполнения, отсортированных по метка времени
- при использовании трубы (рекомендуемый): объединение записей журнала на лету из всех каналов в центральный файл журнала. (Например, периодически
selectиз файловых дескрипторов каналов выполните сортировку слиянием доступных записей журнала и сбросьте их в централизованный журнал. Повторять.)
Я только что написал собственный обработчик журнала, который просто передает все родительскому процессу через канал. Я только тестировал его в течение десяти минут, но, кажется, работает довольно хорошо.
(Примечание: это жестко закодировано в
RotatingFileHandler, который является моим собственным прецедентом.)
обновление: @javier теперь поддерживает этот подход как пакет, доступный на Pypi-см. многопроцессорная обработка-ведение журнала на Pypi, github at https://github.com/jruere/multiprocessing-logging
Обновление: Осуществление!
это теперь использует очередь для правильной обработки параллелизма, а также исправляет ошибки правильно. Я теперь использую это в течение нескольких месяцев, и текущая версия ниже работает без проблем.
from logging.handlers import RotatingFileHandler import multiprocessing, threading, logging, sys, traceback class MultiProcessingLog(logging.Handler): def __init__(self, name, mode, maxsize, rotate): logging.Handler.__init__(self) self._handler = RotatingFileHandler(name, mode, maxsize, rotate) self.queue = multiprocessing.Queue(-1) t = threading.Thread(target=self.receive) t.daemon = True t.start() def setFormatter(self, fmt): logging.Handler.setFormatter(self, fmt) self._handler.setFormatter(fmt) def receive(self): while True: try: record = self.queue.get() self._handler.emit(record) except (KeyboardInterrupt, SystemExit): raise except EOFError: break except: traceback.print_exc(file=sys.stderr) def send(self, s): self.queue.put_nowait(s) def _format_record(self, record): # ensure that exc_info and args # have been stringified. Removes any chance of # unpickleable things inside and possibly reduces # message size sent over the pipe if record.args: record.msg = record.msg % record.args record.args = None if record.exc_info: dummy = self.format(record) record.exc_info = None return record def emit(self, record): try: s = self._format_record(record) self.send(s) except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) def close(self): self._handler.close() logging.Handler.close(self)
еще одной альтернативой могут быть различные обработчики журналов без файлов в
loggingпакета:
SocketHandlerDatagramHandlerSyslogHandler(и других)
таким образом, вы можете легко иметь демон ведения журнала где-то, что вы могли бы написать безопасно и правильно обрабатывать результаты. (Например, простой сервер сокетов, который просто распаковывает сообщение и выдает его на свой собственный вращающийся обработчик файлов.)
The
SyslogHandlerбудет заботиться об этом для вас тоже. Конечно, вы можете использовать свой собственный экземплярsyslog, не системный.
вариант других, который сохраняет ведение журнала и поток очереди отдельно.
"""sample code for logging in subprocesses using multiprocessing * Little handler magic - The main process uses loggers and handlers as normal. * Only a simple handler is needed in the subprocess that feeds the queue. * Original logger name from subprocess is preserved when logged in main process. * As in the other implementations, a thread reads the queue and calls the handlers. Except in this implementation, the thread is defined outside of a handler, which makes the logger definitions simpler. * Works with multiple handlers. If the logger in the main process defines multiple handlers, they will all be fed records generated by the subprocesses loggers. tested with Python 2.5 and 2.6 on Linux and Windows """ import os import sys import time import traceback import multiprocessing, threading, logging, sys DEFAULT_LEVEL = logging.DEBUG formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s") class SubProcessLogHandler(logging.Handler): """handler used by subprocesses It simply puts items on a Queue for the main process to log. """ def __init__(self, queue): logging.Handler.__init__(self) self.queue = queue def emit(self, record): self.queue.put(record) class LogQueueReader(threading.Thread): """thread to write subprocesses log records to main process log This thread reads the records written by subprocesses and writes them to the handlers defined in the main process's handlers. """ def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue self.daemon = True def run(self): """read from the queue and write to the log handlers The logging documentation says logging is thread safe, so there shouldn't be contention between normal logging (from the main process) and this thread. Note that we're using the name of the original logger. """ # Thanks Mike for the error checking code. while True: try: record = self.queue.get() # get the logger for this record logger = logging.getLogger(record.name) logger.callHandlers(record) except (KeyboardInterrupt, SystemExit): raise except EOFError: break except: traceback.print_exc(file=sys.stderr) class LoggingProcess(multiprocessing.Process): def __init__(self, queue): multiprocessing.Process.__init__(self) self.queue = queue def _setupLogger(self): # create the logger to use. logger = logging.getLogger('test.subprocess') # The only handler desired is the SubProcessLogHandler. If any others # exist, remove them. In this case, on Unix and Linux the StreamHandler # will be inherited. for handler in logger.handlers: # just a check for my sanity assert not isinstance(handler, SubProcessLogHandler) logger.removeHandler(handler) # add the handler handler = SubProcessLogHandler(self.queue) handler.setFormatter(formatter) logger.addHandler(handler) # On Windows, the level will not be inherited. Also, we could just # set the level to log everything here and filter it in the main # process handlers. For now, just set it from the global default. logger.setLevel(DEFAULT_LEVEL) self.logger = logger def run(self): self._setupLogger() logger = self.logger # and here goes the logging p = multiprocessing.current_process() logger.info('hello from process %s with pid %s' % (p.name, p.pid)) if __name__ == '__main__': # queue used by the subprocess loggers queue = multiprocessing.Queue() # Just a normal logger logger = logging.getLogger('test') handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(DEFAULT_LEVEL) logger.info('hello from the main process') # This thread will read from the subprocesses and write to the main log's # handlers. log_queue_reader = LogQueueReader(queue) log_queue_reader.start() # create the processes. for i in range(10): p = LoggingProcess(queue) p.start() # The way I read the multiprocessing warning about Queue, joining a # process before it has finished feeding the Queue can cause a deadlock. # Also, Queue.empty() is not realiable, so just make sure all processes # are finished. # active_children joins subprocesses when they're finished. while multiprocessing.active_children(): time.sleep(.1)
в поваренной книге журнала python есть два полных примера здесь: https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
Он использует
QueueHandler, который является новым в python 3.2, но легко скопировать в свой собственный код (как я сделал сам в python 2.7) из:https://gist.github.com/vsajip/591589каждый процесс помещает свой журнал на
Queue, а потомlistenerпоток или процесс (например предусмотрено для каждого) подбирает их и записывает их все в файл - без риска повреждения или искажения.
все текущие решения слишком связаны с конфигурацией ведения журнала с помощью обработчика. Мое решение имеет следующую архитектуру и функции:
- можно использовать любой Регистрация конфигурации вы хотите
- ведение журнала выполняется в потоке демона
- безопасное завершение работы демона с помощью контекстного менеджера
- связь с потоком ведения журнала осуществляется с помощью
multiprocessing.Queue- в подпроцессы,
logging.Logger(и уже определенные экземпляры) исправлены для отправки все записи в очередь- новая: форматирование трассировки и сообщения перед отправкой в очередь для предотвращения ошибок маринования
код с примером использования и выводом можно найти по следующему адресу:https://gist.github.com/schlamar/7003737
Ниже Еще одно решение с акцентом на простоту для всех остальных (как я), которые получают здесь от Google. Регистрация должна быть легкой! Только для 3.2 или выше.
import multiprocessing import logging from logging.handlers import QueueHandler, QueueListener import time import random def f(i): time.sleep(random.uniform(.01, .05)) logging.info('function called with {} in worker thread.'.format(i)) time.sleep(random.uniform(.01, .05)) return i def worker_init(q): # all records from worker processes go to qh and then into q qh = QueueHandler(q) logger = logging.getLogger() logger.setLevel(logging.DEBUG) logger.addHandler(qh) def logger_init(): q = multiprocessing.Queue() # this is the handler for all log records handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s")) # ql gets records from the queue and sends them to the handler ql = QueueListener(q, handler) ql.start() logger = logging.getLogger() logger.setLevel(logging.DEBUG) # add the handler to the logger so records from this process are handled logger.addHandler(handler) return ql, q def main(): q_listener, q = logger_init() logging.info('hello from main thread') pool = multiprocessing.Pool(4, worker_init, [q]) for result in pool.map(f, range(10)): pass pool.close() pool.join() q_listener.stop() if __name__ == '__main__': main()
мне также нравится ответ zzzeek, но Андре прав, что для предотвращения искажения требуется очередь. Мне немного повезло с трубой, но я видел искажение, которое несколько ожидалось. Реализация этого оказалась сложнее, чем я думал, особенно из-за работы в Windows, где есть некоторые дополнительные ограничения на глобальные переменные и прочее (см.: как реализована многопроцессорная обработка Python в Windows?)
но, я, наконец, получил это работает. Этот пример вероятно, это не идеально, поэтому комментарии и предложения приветствуются. Он также не поддерживает настройку модуля форматирования или чего-либо другого, кроме корневого регистратора. В принципе, вы должны повторно запустить регистратор в каждом из процессов пула с очередью и настроить другие атрибуты на регистраторе.
опять же, любые предложения о том, как сделать код лучше приветствуются. Я, конечно, еще не знаю всех трюков Python: -)
import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue class MultiProcessingLogHandler(logging.Handler): def __init__(self, handler, queue, child=False): logging.Handler.__init__(self) self._handler = handler self.queue = queue # we only want one of the loggers to be pulling from the queue. # If there is a way to do this without needing to be passed this # information, that would be great! if child == False: self.shutdown = False self.polltime = 1 t = threading.Thread(target=self.receive) t.daemon = True t.start() def setFormatter(self, fmt): logging.Handler.setFormatter(self, fmt) self._handler.setFormatter(fmt) def receive(self): #print "receive on" while (self.shutdown == False) or (self.queue.empty() == False): # so we block for a short period of time so that we can # check for the shutdown cases. try: record = self.queue.get(True, self.polltime) self._handler.emit(record) except Queue.Empty, e: pass def send(self, s): # send just puts it in the queue for the server to retrieve self.queue.put(s) def _format_record(self, record): ei = record.exc_info if ei: dummy = self.format(record) # just to get traceback text into record.exc_text record.exc_info = None # to avoid Unpickleable error return record def emit(self, record): try: s = self._format_record(record) self.send(s) except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) def close(self): time.sleep(self.polltime+1) # give some time for messages to enter the queue. self.shutdown = True time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown def __del__(self): self.close() # hopefully this aids in orderly shutdown when things are going poorly. def f(x): # just a logging command... logging.critical('function number: ' + str(x)) # to make some calls take longer than others, so the output is "jumbled" as real MP programs are. time.sleep(x % 3) def initPool(queue, level): """ This causes the logging module to be initialized with the necessary info in pool threads to work correctly. """ logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True)) logging.getLogger('').setLevel(level) if __name__ == '__main__': stream = StringIO.StringIO() logQueue = multiprocessing.Queue(100) handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue) logging.getLogger('').addHandler(handler) logging.getLogger('').setLevel(logging.DEBUG) logging.debug('starting main') # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging. pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes pool.map(f, range(0,50)) pool.close() logging.debug('done') logging.shutdown() print "stream output is:" print stream.getvalue()
поскольку мы можем представлять многопроцессное ведение журнала как многие издатели, так и один подписчик (слушатель), используя ZeroMQ реализовать pub-SUB messaging-это действительно вариант.
кроме того, PyZMQ модуль, привязки Python для ZMQ, реализует PUBHandler, который является объектом для публикации сообщений через zmq.Паб сокет.
здесь решение в интернете, для централизованного ведения журнала из распределенного приложение использует PyZMQ и PUBHandler, которые могут быть легко приняты для работы локально с несколькими процессами публикации.
formatters = { logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"), logging.INFO: logging.Formatter("[%(name)s] %(message)s"), logging.WARN: logging.Formatter("[%(name)s] %(message)s"), logging.ERROR: logging.Formatter("[%(name)s] %(message)s"), logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s") } # This one will be used by publishing processes class PUBLogger: def __init__(self, host, port=config.PUBSUB_LOGGER_PORT): self._logger = logging.getLogger(__name__) self._logger.setLevel(logging.DEBUG) self.ctx = zmq.Context() self.pub = self.ctx.socket(zmq.PUB) self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port)) self._handler = PUBHandler(self.pub) self._handler.formatters = formatters self._logger.addHandler(self._handler) @property def logger(self): return self._logger # This one will be used by listener process class SUBLogger: def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT): self.output_dir = output_dir self._logger = logging.getLogger() self._logger.setLevel(logging.DEBUG) self.ctx = zmq.Context() self._sub = self.ctx.socket(zmq.SUB) self._sub.bind('tcp://*:{1}'.format(ip, port)) self._sub.setsockopt(zmq.SUBSCRIBE, "") handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10) handler.setLevel(logging.DEBUG) formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s") handler.setFormatter(formatter) self._logger.addHandler(handler) @property def sub(self): return self._sub @property def logger(self): return self._logger # And that's the way we actually run things: # Listener process will forever listen on SUB socket for incoming messages def run_sub_logger(ip, event): sub_logger = SUBLogger(ip) while not event.is_set(): try: topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK) log_msg = getattr(logging, topic.lower()) log_msg(message) except zmq.ZMQError as zmq_error: if zmq_error.errno == zmq.EAGAIN: pass # Publisher processes loggers should be initialized as follows: class Publisher: def __init__(self, stop_event, proc_id): self.stop_event = stop_event self.proc_id = proc_id self._logger = pub_logger.PUBLogger('127.0.0.1').logger def run(self): self._logger.info("{0} - Sending message".format(proc_id)) def run_worker(event, proc_id): worker = Publisher(event, proc_id) worker.run() # Starting subscriber process so we won't loose publisher's messages sub_logger_process = Process(target=run_sub_logger, args=('127.0.0.1'), stop_event,)) sub_logger_process.start() #Starting publisher processes for i in range(MAX_WORKERS_PER_CLIENT): processes.append(Process(target=run_worker, args=(stop_event, i,))) for p in processes: p.start()
просто опубликуйте где-нибудь свой экземпляр регистратора. таким образом, другие модули и клиенты могут использовать ваш API для получения регистратора без необходимости
import multiprocessing.
Мне понравился ответ zzzeek. Я бы просто заменил трубу для очереди, так как если несколько потоков/процессов используют один и тот же конец трубы для создания сообщений журнала, они будут искажены.
Как насчет делегирования всего ведения журнала другому процессу, который считывает все записи журнала из очереди?
LOG_QUEUE = multiprocessing.JoinableQueue() class CentralLogger(multiprocessing.Process): def __init__(self, queue): multiprocessing.Process.__init__(self) self.queue = queue self.log = logger.getLogger('some_config') self.log.info("Started Central Logging process") def run(self): while True: log_level, message = self.queue.get() if log_level is None: self.log.info("Shutting down Central Logging process") break else: self.log.log(log_level, message) central_logger_process = CentralLogger(LOG_QUEUE) central_logger_process.start()просто поделитесь LOG_QUEUE с помощью любого из многопроцессорных механизмов или даже наследования, и все это отлично работает!
У меня есть решение, похожее на ironhacker, за исключением того, что я использую ведение журнала.исключение в некоторых из моего кода и обнаружил, что мне нужно отформатировать исключение, прежде чем передавать его обратно в очередь, так как трассировки не pickle'table:
class QueueHandler(logging.Handler): def __init__(self, queue): logging.Handler.__init__(self) self.queue = queue def emit(self, record): if record.exc_info: # can't pass exc_info across processes so just format now record.exc_text = self.formatException(record.exc_info) record.exc_info = None self.queue.put(record) def formatException(self, ei): sio = cStringIO.StringIO() traceback.print_exception(ei[0], ei[1], ei[2], None, sio) s = sio.getvalue() sio.close() if s[-1] == "\n": s = s[:-1] return s
Ниже приведен класс, который может быть использован в среде Windows, требует ActivePython. Вы также можете наследовать для других обработчиков журналов (StreamHandler и т. д.)
class SyncronizedFileHandler(logging.FileHandler): MUTEX_NAME = 'logging_mutex' def __init__(self , *args , **kwargs): self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME) return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs) def emit(self, *args , **kwargs): try: win32event.WaitForSingleObject(self.mutex , win32event.INFINITE) ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs) finally: win32event.ReleaseMutex(self.mutex) return retи вот пример, который демонстрирует использование:
import logging import random , time , os , sys , datetime from string import letters import win32api , win32event from multiprocessing import Pool def f(i): time.sleep(random.randint(0,10) * 0.1) ch = random.choice(letters) logging.info( ch * 30) def init_logging(): ''' initilize the loggers ''' formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s") logger = logging.getLogger() logger.setLevel(logging.INFO) file_handler = SyncronizedFileHandler(sys.argv[1]) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) logger.addHandler(file_handler) #must be called in the parent and in every worker process init_logging() if __name__ == '__main__': #multiprocessing stuff pool = Pool(processes=10) imap_result = pool.imap(f , range(30)) for i , _ in enumerate(imap_result): pass
вот мой простой хак / обходной путь... не самый полный, но легко модифицируемый и более простой для чтения и понимания я думаю, чем любые другие ответы, которые я нашел перед написанием этого:
import logging import multiprocessing class FakeLogger(object): def __init__(self, q): self.q = q def info(self, item): self.q.put('INFO - {}'.format(item)) def debug(self, item): self.q.put('DEBUG - {}'.format(item)) def critical(self, item): self.q.put('CRITICAL - {}'.format(item)) def warning(self, item): self.q.put('WARNING - {}'.format(item)) def some_other_func_that_gets_logger_and_logs(num): # notice the name get's discarded # of course you can easily add this to your FakeLogger class local_logger = logging.getLogger('local') local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2)) local_logger.debug('hmm, something may need debugging here') return num*2 def func_to_parallelize(data_chunk): # unpack our args the_num, logger_q = data_chunk # since we're now in a new process, let's monkeypatch the logging module logging.getLogger = lambda name=None: FakeLogger(logger_q) # now do the actual work that happens to log stuff too new_num = some_other_func_that_gets_logger_and_logs(the_num) return (the_num, new_num) if __name__ == '__main__': multiprocessing.freeze_support() m = multiprocessing.Manager() logger_q = m.Queue() # we have to pass our data to be parallel-processed # we also need to pass the Queue object so we can retrieve the logs parallelable_data = [(1, logger_q), (2, logger_q)] # set up a pool of processes so we can take advantage of multiple CPU cores pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4) worker_output = pool.map(func_to_parallelize, parallelable_data) pool.close() # no more tasks pool.join() # wrap up current tasks # get the contents of our FakeLogger object while not logger_q.empty(): print logger_q.get() print 'worker output contained: {}'.format(worker_output)
одной из альтернатив является запись журнала mutliprocessing в известный файл и регистрация
atexitобработчик для присоединения к этим процессам считывает его обратно на stderr; однако вы не получите поток в реальном времени для выходных сообщений на stderr таким образом.
Если у вас есть взаимоблокировки, происходящие в комбинации замков, потоков и вилок в
loggingмодуль, который сообщил в сообщение об ошибке 6721 (см. Также связанные так вопрос).есть небольшое исправление решение опубликовано здесь.
однако, это просто исправит любые потенциальные тупики в
logging. Это не исправит, что вещи, возможно, искажены. См. другие ответы, представленные здесь.
есть этот большой пакет
пакет: https://pypi.python.org/pypi/multiprocessing-logging/
код: https://github.com/jruere/multiprocessing-logging
установка:
pip install multiprocessing-loggingпотом добавил:
import multiprocessing_logging # This enables logs inside process multiprocessing_logging.install_mp_handler()
для моих детей, которые встречаются с той же проблемой в течение десятилетий и нашли этот вопрос на этом сайте, я оставляю этот ответ.
простота против чрезмерного усложнения. Просто используйте другие инструменты. Python является удивительным, но он не был разработан, чтобы сделать некоторые вещи.
следующий фрагмент для logrotate демон работает для меня и не усложнять вещи. Запланируйте его для запуска ежечасно и
/var/log/mylogfile.log { size 1 copytruncate create rotate 10 missingok postrotate timeext=`date -d '1 hour ago' "+%Y-%m-%d_%H"` mv /var/log/mylogfile.log.1 /var/log/mylogfile-$timeext.log endscript }вот как я его устанавливаю (символические ссылки не работают logrotate):
sudo cp /directpath/config/logrotate/myconfigname /etc/logrotate.d/myconfigname sudo cp /etc/cron.daily/logrotate /etc/cron.hourly/logrotate
Comments