Asyncio и rabbitmq (asynqp): как потреблять из нескольких очередей одновременно
Я пытаюсь использовать несколько очередей одновременно, используя python, asyncio и asynqp.
Я не понимаю, почему мой вызов функции
asyncio.sleep() не имеет никакого эффекта. Код на этом не останавливается. Честно говоря, я на самом деле не понимаю, в каком контексте выполняется обратный вызов, и могу ли я вообще передать управление bavck циклу событий (так что вызов asyncio.sleep() будет иметь смысл).Что, если бы мне пришлось использовать вызов функции aiohttp.ClientSession.get() в моей функции обратного вызова process_msg? Я не может сделать это, так как это не корутин. Должен быть способ, который находится за пределами моего нынешнего понимания асинсио.
#!/usr/bin/env python3
import asyncio
import asynqp
USERS = {'betty', 'bob', 'luis', 'tony'}
def process_msg(msg):
asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()
async def connect():
connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
channel = await connection.open_channel()
exchange = await channel.declare_exchange('inboxes', 'direct')
# we have 10 users. Set up a queue for each of them
# use different channels to avoid any interference
# during message consumption, just in case.
for username in USERS:
user_channel = await connection.open_channel()
queue = await user_channel.declare_queue('Inbox_{}'.format(username))
await queue.bind(exchange, routing_key=username)
await queue.consume(process_msg)
# deliver 10 messages to each user
for username in USERS:
for msg_idx in range(10):
msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
exchange.publish(msg, routing_key=username)
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()
2 ответов:
Я не понимаю, почему мой asyncio.вызов функции sleep() не имеет любой эффект.Потому что
asyncio.sleep()возвращает будущий объект, который должен использоваться в сочетании с циклом событий (или семантикойasync/await).Вы не можете использовать
awaitв простом объявленииdef, потому что обратный вызов вызывается вне контекстаasync/await, который присоединен к некоторому циклу событий под капотом. Другими словами, смешивать стиль обратного вызова со стилемasync/awaitдовольно сложно.В простое решение, однако, состоит в том, чтобы запланировать работу обратно в цикл событий:
Обратите внимание, что в функцииasync def process_msg(msg): await asyncio.sleep(10) print('>> {}'.format(msg.body)) msg.ack() def _process_msg(msg): loop = asyncio.get_event_loop() loop.create_task(process_msg(msg)) # or if loop is always the same one single line is enough # asyncio.ensure_future(process_msg(msg)) # some code await queue.consume(_process_msg)_process_msgнет рекурсии, т. е. телоprocess_msgне выполняется в то время как в_process_msg. Внутренняя функцияprocess_msgбудет вызвана, как только элемент управления вернется в цикл событий.Это можно обобщить следующим кодом:
def async_to_callback(coro): def callback(*args, **kwargs): asyncio.ensure_future(coro(*args, **kwargs)) return callback async def process_msg(msg): # the body # some code await queue.consume(async_to_callback(process_msg))
Смотрите ответ Drizzt1991 на github для решения.
Comments