Использование pydispatch между потоками



Я столкнулся с проблемой использования модуля pydispatch для связи между потоками. Я использовал пример, приведенный здесь: https://sites.google.com/site/hardwaremonkey/blog/python-howtocommunicatebetweenthreadsusingpydispatch



Я немного изменил его, чтобы предоставить немного более подробную информацию в журнале. В частности, я заставил его также отображать фактическое имя потока:



from pydispatch import dispatcher
import threading
import time
import logging

log_formatter = logging.Formatter('%(asctime)s %(levelname)s [%(name)s] [%(threadName)s] %(message)s', '%H:%M:%S')
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
log_handler = logging.StreamHandler()
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)


ALICE_SIGNAL='alice_signal'
ALICE_SENDER='alice_sender'
BOB_SIGNAL='bob_signal'
BOB_SENDER='bob_sender'

class Alice():
''' alice procrastinates and replies to bob'''
def __init__(self):
logger.debug('Alice instantiated')
dispatcher.connect(self.alice_dispatcher_receive, signal=BOB_SIGNAL, sender=BOB_SENDER)
self.alice()

def alice_dispatcher_receive(self, message):
''' handle dispatcher'''
logger.debug('Alice has received message: {}'.format(message))
dispatcher.send(message='thank you from Alice', signal=ALICE_SIGNAL, sender=ALICE_SENDER)

def alice(self):
''' loop and wait '''
while True:
logger.debug('Alice is procrastinating')
time.sleep(1)

class Bob():
''' bob contacts alice periodically '''
def __init__(self):
logger.debug('Bob instantiated')
dispatcher.connect(self.bob_dispatcher_receive, signal=ALICE_SIGNAL, sender=ALICE_SENDER)
self.bob()

def bob_dispatcher_receive(self, message):
''' handle dispatcher '''
logger.debug('Bob has received message: {}'.format(message))

def bob(self):
''' loop and send messages using a dispatcher '''
while True:
dispatcher.send(message='message from Bob', signal=BOB_SIGNAL, sender=BOB_SENDER)
time.sleep(3)


if __name__ == '__main__':
logger.debug('Starting...')
alice_thread = threading.Thread(target=Alice, name='Thread-Alice')
alice_thread.start()
bob_thread = threading.Thread(target=Bob, name='Thread-Bob')
bob_thread.start()


Вот что я нашел:



08:10:43 DEBUG [root] [MainThread] Starting...
08:10:43 DEBUG [root] [Thread-Alice] Alice instantiated
08:10:43 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:43 DEBUG [root] [Thread-Bob] Bob instantiated
08:10:43 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:43 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:44 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:45 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:46 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:46 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:46 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:47 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:48 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:49 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:49 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:49 DEBUG [root] [Thread-Alice] Alice is procrastinating


Смотрите это:



[Thread-Bob] Alice has received message: message from Bob


" Алиса получила сообщение " было выполнено в потоке Боба. В то время как я ожидал, что он будет выполнен в потоке Алисы. Из того, что я понял, диспетчер получает сигнал от Боба и идет прямо к вызову обработчиков, в том же потоке. Таким образом, он фактически вызывает код Алисы из потока Боба, что приводит к неожиданным нюансам.



Задача №1. Исполнение Боба фактически блокируется, пока сигнал обрабатывается Алисой.



Задача №2. В более крупном приложении Алиса может неожиданно получить свой код выполняется в нескольких параллельных потоках.



Задача №3. Плохая инкапсуляция в целом. Мы ожидаем, что Алиса и Боб будут запущены в своих собственных потоках в одном единственном экземпляре каждый, независимо друг от друга, только обмениваясь сообщениями. Но это не так, поскольку они на самом деле называют код друг друга.



Есть ли способ решить эту проблему для pydispatcher? Или вы можете порекомендовать другую библиотеку для связи между потоками, которая свободна от этих проблем?

540   1  

1 ответ:

Нашел решение с помощью event_loop.call_soon_threadsafe ().

Теперь вот код:

def register_signal_handler(loop, handler, signal, sender):
    def dispatcher_receive(message):
        loop.call_soon_threadsafe(handler, message)
    dispatcher.connect(dispatcher_receive, signal=signal, sender=sender, weak=False)


class Alice():
    def __init__(self):
        logger.debug('Alice instantiated')
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        register_signal_handler(self.loop, self.alice_receive, signal=BOB_SIGNAL, sender=BOB_SENDER)
        self.alice()

    def alice_receive(self, message):
        logger.debug('Alice has received message: {}'.format(message))
        dispatcher.send(message='thank you from Alice', signal=ALICE_SIGNAL, sender=ALICE_SENDER)

    def alice(self):
        def procrastinate():
            logger.debug('Alice is procrastinating')
            self.loop.call_later(1, procrastinate)
        procrastinate()
        self.loop.run_forever()

class Bob():
    def __init__(self):
        logger.debug('Bob instantiated')
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        register_signal_handler(self.loop, self.bob_receive, signal=ALICE_SIGNAL, sender=ALICE_SENDER)
        self.bob()

    def bob_receive(self, message):
        logger.debug('Bob has received message: {}'.format(message))

    def bob(self):
        def poke_alice():
            dispatcher.send(message='message from Bob', signal=BOB_SIGNAL, sender=BOB_SENDER)
            self.loop.call_later(3, poke_alice)
        self.loop.call_later(3, poke_alice)
        self.loop.run_forever()
Таким образом, когда сообщение поступает от Боба к Алисе, обработчик сигнала фактически не выполняет работу по обработке сообщения, а только планирует выполнение фактического обработчика, который будет выполнять эту работу. И фактический обработчик будет вызван в потоке Алисы.

В этом случае:

  1. сигнал всегда обрабатывается почти мгновенно и никогда не блокирует сигнал отправителя. нитка.
  2. Код Алисы всегда выполняется в потоке Алисы. Код Боба всегда выполняется в потоке Боба.
Итак, мои цели здесь достигнуты. Ребята, как вы думаете, это хорошее решение? Хотел бы услышать комментарий по этому поводу.

Comments

    Ничего не найдено.