Как добиться отсроченной очереди с помощью apache kafka?
Как добавить отложенные задания на kafka? Как я понимаю, он имеет дело не с каждым сообщением, а с каждой темой. Мои рабочие места имеют различный график, в котором я хотел бы их потреблять. Скажем, один будет в ближайшие 4 часа, другой будет I Dec. 1 и т. д.
Есть ли у Кафки собственная поддержка для этого или других сторонних способов достижения того же самого?
Вместо этого я думаю использовать Redis для отложенной очереди и отправить задание Кафке, как только его расписание будет получено, но если это возможно, я хотел бы использовать только одну зависимость.
3 ответов:
В Кафке нет понятия работы. Это просто тупая высокопроизводительная служба очереди сообщений. В зависимости от ваших требований вы можете рассмотреть возможность хранения заданий в хранилище, которое поддерживает индексацию по времени выполнения заданий, как некоторые СУБД. Затем в каком-то процессе периодически извлекают задания со временем выполнения в некотором небольшом диапазоне [last_check_time, current_time+lookahead_interval] и помещают их в тему Кафки для последующей обработки.
Здесь немного запоздалый ответ. Теперь в последней версии Kafka 0.10+ можно использовать отложенный поток, используя новую метку времени для каждого сообщения. Я использую это прямо сейчас, чтобы реализовать непрерывный агрегирующий набор данных, не прибегая к внешним зависимостям.
Эти записи проходят, и могут иметь обновления/удаления, проходящие в течение следующих 60 минут после первого события, поэтому я не могу объявить одну из них "окончательной", пока не увижу все новинки.
Итак, чтобы справиться с этим делом, я потребляю тему со всеми созданиями/обновлениями/удалениями дважды, первый в реальном времени (или как можно быстрее), второй задерживается на 90 минут, чтобы убедиться, что я ничего не пропустил. В реальном времени потребитель, я храню локально все необходимые обновления для создания. Затем на отложенном потребителе, когда я получу конкретное "создать", я пойду искать в своем локальном хранилище любые обновления / удаления, обновлю запись, чтобы она знала, что это окончательный статус, и произведите его в заключительную тему снова в Кафку.
Чтобы гарантировать, что у меня не закончится дисковое пространство, я также постоянно урезаю локальное хранилище, чтобы оно хранило не более двух часов обновлений/удалений.
В качестве альтернативы, вы можете использовать RabbitMQ, который поддерживает это с помощью сообщение TTL & обмен мертвыми письмами
Для получения дополнительной информации посетите:
Https://m.alphasights.com/exponential-backoff-with-rabbitmq-78386b9bec81
Comments