Asyncio и rabbitmq (asynqp): как потреблять из нескольких очередей одновременно



Я пытаюсь использовать несколько очередей одновременно, используя python, asyncio и asynqp.



Я не понимаю, почему мой вызов функции asyncio.sleep() не имеет никакого эффекта. Код на этом не останавливается. Честно говоря, я на самом деле не понимаю, в каком контексте выполняется обратный вызов, и могу ли я вообще передать управление bavck циклу событий (так что вызов asyncio.sleep() будет иметь смысл).

Что, если бы мне пришлось использовать вызов функции aiohttp.ClientSession.get() в моей функции обратного вызова process_msg? Я не может сделать это, так как это не корутин. Должен быть способ, который находится за пределами моего нынешнего понимания асинсио.



#!/usr/bin/env python3

import asyncio
import asynqp


USERS = {'betty', 'bob', 'luis', 'tony'}


def process_msg(msg):
asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()

async def connect():
connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
channel = await connection.open_channel()
exchange = await channel.declare_exchange('inboxes', 'direct')

# we have 10 users. Set up a queue for each of them
# use different channels to avoid any interference
# during message consumption, just in case.
for username in USERS:
user_channel = await connection.open_channel()
queue = await user_channel.declare_queue('Inbox_{}'.format(username))
await queue.bind(exchange, routing_key=username)
await queue.consume(process_msg)

# deliver 10 messages to each user
for username in USERS:
for msg_idx in range(10):
msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
exchange.publish(msg, routing_key=username)


loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()
846   2  

2 ответов:

Я не понимаю, почему мой asyncio.вызов функции sleep() не имеет любой эффект.

Потому что asyncio.sleep() возвращает будущий объект, который должен использоваться в сочетании с циклом событий (или семантикой async/await).

Вы не можете использовать await в простом объявлении def, потому что обратный вызов вызывается вне контекста async/await, который присоединен к некоторому циклу событий под капотом. Другими словами, смешивать стиль обратного вызова со стилем async/await довольно сложно.

В простое решение, однако, состоит в том, чтобы запланировать работу обратно в цикл событий:

async def process_msg(msg):
    await asyncio.sleep(10)
    print('>> {}'.format(msg.body))
    msg.ack()

def _process_msg(msg):
    loop = asyncio.get_event_loop()
    loop.create_task(process_msg(msg))
    # or if loop is always the same one single line is enough
    # asyncio.ensure_future(process_msg(msg))

# some code
await queue.consume(_process_msg)
Обратите внимание, что в функции _process_msg нет рекурсии, т. е. тело process_msg не выполняется в то время как в _process_msg. Внутренняя функция process_msg будет вызвана, как только элемент управления вернется в цикл событий.

Это можно обобщить следующим кодом:

def async_to_callback(coro):
    def callback(*args, **kwargs):
        asyncio.ensure_future(coro(*args, **kwargs))
    return callback

async def process_msg(msg):
    # the body

# some code
await queue.consume(async_to_callback(process_msg))

Смотрите ответ Drizzt1991 на github для решения.

Comments

    Ничего не найдено.