RabbitMQ / AMQP: одна очередь, несколько потребителей для одного сообщения?



Я только начинаю использовать RabbitMQ и AMQP в целом.




  • у меня есть очередь сообщений

  • у меня есть несколько потребителей, которые я хотел бы сделать разные вещи с тот же.


большая часть документации RabbitMQ, по-видимому, ориентирована на циклический перебор, т. е. когда одно сообщение потребляется одним потребителем, а нагрузка распределяется между каждым потребителем. Это действительно то поведение, которое я наблюдаю.



пример: производитель имеет одну очередь, и отправлять сообщения каждые 2 секунды:



var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}

setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)


})


а вот и потребитель:



var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})


Если я запускаю потребителя дважды,Я вижу, что каждый потребитель потребляет альтернативные сообщения в циклическом поведении. Например, я буду видеть сообщения 1, 3, 5 в одном терминале, 2, 4, 6 в другом.



мой вопрос:




  • могу ли я заставить каждого потребителя получать одни и те же сообщения? То есть, как потребители получают сообщение 1, 2, 3, 4, 5, 6? Что это называется в AMQP / RabbitMQ говорить? Как это обычно настраивается?


  • это обычно делается? Должен ли я просто маршрутизировать сообщение exchange в две отдельные очереди с одним потребителем?


2413   10  

10 ответов:

могу ли я заставить каждого потребителя получать одни и те же сообщения? Т. е., как потребители получают сообщение 1, 2, 3, 4, 5, 6? Что это называется в AMQP / RabbitMQ говорить? Как это обычно настраивается?

нет, если потребители находятся в одной очереди. От кролика концепции AMQP руководство:

важно понимать, что в AMQP 0-9-1 сообщения сбалансированы по нагрузке между потребителями.

это, кажется, означает, что циклическое поведение в очереди является заданным, и не настраивается. Т. е., отдельные очереди требуются для того, чтобы один и тот же идентификатор сообщения обрабатывался несколькими потребителями.

это обычно делается? Должен ли я просто маршрутизировать сообщение exchange в две отдельные очереди с одним потребителем?

нет, это не так, одна очередь / несколько потребителей с каждым потребителем, обрабатывающим один и тот же идентификатор сообщения, невозможны. Имея exchange маршрутизирует сообщение на две отдельные очереди действительно лучше.

поскольку я не требую слишком сложной маршрутизации, a fanout exchange справится с этим красиво. Я не слишком много внимания уделял обмену ранее, поскольку node-amqp имеет концепцию "обмена по умолчанию", позволяющую публиковать сообщения непосредственно в соединении, однако большинство сообщений AMQP публикуются в определенном обмене.

вот мой обмен fanout, как отправка, так и получение:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})

просто читать в RabbitMQ учебник. Вы публикуете сообщение в exchange, а не в очереди; затем оно направляется в соответствующие очереди. В вашем случае вы должны привязать отдельную очередь для каждого потребителя. Таким образом, они могут потреблять сообщения совершенно независимо.

последние пару ответов почти верны - у меня есть тонны приложений, которые генерируют сообщения, которые должны в конечном итоге с различными потребителями, так что процесс очень прост.

Если вы хотите, чтобы несколько потребителей к одному сообщению, выполните следующую процедуру.

Создайте несколько очередей, по одной для каждого приложения, которое должно получить сообщение, в каждом свойстве очереди "свяжите" тег маршрутизации с amq.прямой обмен. Изменить вы публикуете приложение, чтобы отправить его свойства.сразу и польза routing-tag (не очередь). Затем AMQP скопирует сообщение в каждую очередь с той же привязкой. Работает как шарм :)

пример: Допустим, у меня есть строка JSON, которую я генерирую, я публикую ее в "amq.прямой "обмен с использованием тега маршрутизации " new-sales-order", у меня есть очередь для моего приложения order_printer, которое печатает заказ, у меня есть очередь для моей биллинговой системы, которая отправит копию заказа и выставит счет клиенту, и у меня есть система веб-архива, в которой я архивирую заказы для исторические / причины соответствия, и у меня есть клиентский веб-интерфейс, где заказы отслеживаются по мере поступления другой информации о заказе.

Итак, мои очереди: order_printer, order_billing, order_archive и order_tracking У всех есть привязка тега "new-sales-order", привязанная к ним, все 4 получат данные JSON.

Это идеальный способ для отправки данных без публикации приложения, зная или заботясь о получении приложений.

шаблон отправки является отношением один к одному. Если вы хотите "отправить" более чем одному получателю, вы должны использовать шаблон pub/sub. См.http://www.rabbitmq.com/tutorials/tutorial-three-python.html для более подробной информации.

да каждый потребитель может получать одни и те же сообщения. взгляните на http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html

для различных способов маршрутизации сообщений. Я знаю, что они предназначены для python и java, но его хорошо понять принципы, решить, что вы делаете, а затем найти, как это сделать в JS. Его звук как вы хотите сделать простой разветвитель (Урок 3), который отправляет сообщения во все очереди, подключенные к exchange.

разница с тем, что вы делаете и что вы хотите сделать, в основном заключается в том, что вы собираетесь настроить и обменять или ввести fanout. Fanout excahnges отправляет все сообщения во все подключенные очереди. Каждая очередь будет иметь потребителя, который будет иметь доступ ко всем сообщениям отдельно.

Да это обычно делается, это один из особенности AMPQ.

RabbitMQ / AMQP: одна очередь, несколько потребителей для одного сообщения и обновления страницы.

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });

чтобы получить желаемое поведение, просто попросите каждого потребителя потреблять из своей очереди. Вы должны будете использовать не прямой тип обмена (тема, заголовок, разветвление) для того, чтобы получить сообщение для всех очередей сразу.

Как я оцениваю ваш случай:

  • У меня есть очередь сообщений (ваш источник для получения сообщений, давайте назовем его q111)

  • У меня есть несколько потребителей, которые я хотел бы делать разные вещи с одним и тем же сообщением.

ваша проблема здесь заключается в том, что в то время как 3 сообщения принимаются этой очередью, сообщение 1 потребляется потребителем A, другие потребители B и C потребляют сообщение 2 и 3. Где, как вы нуждаетесь в настройка, где rabbitmq передает одинаковые копии всех этих трех сообщений (1,2,3) всем трем подключенным потребителям (A,B,C) одновременно.

в то время как многие конфигурации могут быть сделаны для достижения этой цели, простой способ заключается в использовании следующих двух шагов концепции:

  • используйте динамический rabbitmq-shovel для получения сообщений из нужной очереди (q111) и публикации в fanout exchange (exchange, специально созданный и выделенный для этой цели).
  • Теперь переконфигурируйте своих потребителей A, B & C (которые слушали очередь(q111)) для прослушивания из этого разветвленного обмена непосредственно с помощью эксклюзивной и анонимной очереди для каждого потребителя.

Примечание: при использовании этой концепции не потребляйте непосредственно из исходной очереди(q111), так как уже потребленные сообщения не будут загружены в ваш обмен Fanout.

Если вы думаете, что это не удовлетворяет вашим требованиям... не стесняйтесь размещать свои предложения :-)

Если вы случайно используете amqplib библиотека как я, у них есть удобная пример реализации опубликовать / подписаться RabbitMQ учебник который вы можете найти удобным.

Я думаю, вы должны проверить отправку сообщений с помощью fan-out обменник. Таким образом, вы будете получать одно и то же сообщение для разных потребителей, под таблицей RabbitMQ создает разные очереди для каждого из этих новых потребителей/подписчиков.

Это ссылка для просмотра примера учебника в javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html

Comments

    Ничего не найдено.