Подключение Kafka в Spring Boot



Книга Подключение Kafka в Spring Boot

«Apache Kafka  —  это распределенная Open Source платформа потоковой передачи событий, используемая в тысячах компаний для высокопроизводительных конвейеров данных, потоковой аналитики, интеграции данных и критически важных приложений». С этим определением сайта я полностью согласен.


Настройка демонстрационной среды


Чтобы не отвлекаться от установки Kafka, мы уже включили в деморепозиторий файл docker-compose и взятые отсюда образы Docker.


Для этого демопроекта, созданного из Spring Initializr, используется Maven.


Клонируем деморепозиторий.


Запускаем docker-compose:


docker-compose up -d

Используя модуль spring-kafka, который с помощью высокоуровневых, абстрактных классов поддерживает потребление и отправку сообщений в Kafka, добавляем в демо такую зависимость:


<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

Подключаемся к Kafka как потребитель событий


На простейшем уровне сообщения из Kafka потребляются всего в два этапа.


Этап 1. Добавление в application.properties следующей настройки с информацией о сервере начальной загрузки Kafka:


spring.kafka.consumer.bootstrap-servers=localhost:9092

Этап 2. Создание класса потребителя для обработки сообщений:


@Service
public class DemoStringConsumer {
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(DemoStringConsumer.class);

@KafkaListener(id = "demoGroup", topics = "programmingsharing.topic1")
public void listen(String message) {
log.info("Received: " + message);
}
}

Дополнительные настройки не нужны: приложения Spring Boot готовы к потреблению сообщений из Kafka.


Создаем тему:


docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
--create \
--topic programmingsharing.topic1

Отправляем в нее строковые сообщения:


docker exec --interactive --tty broker \
kafka-console-producer --bootstrap-server broker:9092 \
--topic programmingsharing.topic1

Регистрируем сообщение в журнале:


INFO 26586 --- [demoGroup-0-C-1] c.p.k.consumer.DemoStringConsumer        : Received: Hello world
INFO 26586 --- [demoGroup-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: Hi programming sharing readers

Подключаемся к Kafka как источник событий


Аналогично настройке для потребителя.


Этап 1. Добавление в application.properties настройки с информацией о сервере начальной загрузки источника Kafka:


spring.kafka.producer.bootstrap-servers=localhost:9092

Этап 2. Создание класса для выдачи сообщения с помощью автоматически обнаруживаемого и связываемого компонента KafkaTemplate:


package com.programmingsharing.kafkaspringboot.producer;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.Instant;

@Service
public class SimpleScheduledProducer {
private final KafkaTemplate<Object, Object> template;

private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(SimpleScheduledProducer.class);

public SimpleScheduledProducer(KafkaTemplate<Object, Object> template) {
this.template = template;
}

@Scheduled(fixedDelay = 2000)
public void sendFoo() {
log.info("producing message to Kafka, topic=receiving-topic");
this.template.send("receiving-topic", Instant.now().toString());
}
}

Scheduled применяется в демонстрационных целях: каждые две секунды выдается простое значение момента времени в виде строки.


Теперь приложение готово к отправке сообщений на сервер Kafka. Проверяем их, просто добавляя новый KafkaListener:


@KafkaListener(id = "demoGroup2", topics = "receiving-topic")
public void listenFromReceivingTopic(String message) {
log.info("Received: " + message);
}

В консольном журнале теперь регистрируется информация об источнике и потребителе:


023-01-05T21:24:53.822+07:00  INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer        : Received: 2023-01-05T14:24:52.703290Z
2023-01-05T21:24:54.708+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:24:54.735+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:24:54.709550Z
2023-01-05T21:24:56.713+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:24:56.743+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:24:56.714425Z
2023-01-05T21:24:58.720+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:24:58.751+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:24:58.722375Z
2023-01-05T21:25:00.728+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:25:00.750+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:25:00.729822Z
2023-01-05T21:25:02.736+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:25:02.759+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:25:02.737265Z
2023-01-05T21:25:04.740+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:25:04.768+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:25:04.741946Z

Класс KafkaAutoConfiguration


В модуле spring-boot-autoconfigure имеется класс KafkaAutoConfiguration для автоматического создания необходимых компонентов: kafkaTemplate, kafkaProducerListener, kafkaConsumerFactory, kafkaProducerFactory.


KafkaProperties  —  это класс ConfigurationProperties со свойствами spring.kafka, которые мы добавляли выше. Возможности модуля spring-kafka раскрываются доступными свойствами конфигураций этого класса:



  • Потребитель.

  • Источник.

  • Администратор Kafka.

  • Поток Kafka.

  • Сервис аутентификации и авторизации Java.

  • SSL-аутентификация.

  • Повторные попытки.



312   0  

Comments

    Ничего не найдено.