2022. 11. 26. 00:21ㆍReact/React websocket with spring kafka
라이더들은 배달 정보를 앱을 통해 전송한다. 앞에서 보았지만 앱 대신에 Swagger를
이용하여서 메세지를 전송한다.
Zookeeper 설치 및 설정
Zookeeper 설치 및 설정은 아래 글을 참고 하시면 됩니다.
https://jydlove.tistory.com/82
이번 프로젝트에서는 zNode1 하나만 설정 하여서 프로젝트를 진행하도록 하겠습니다.
Kafka 설치 및 설정
Kafka 설치 및 설정은 아래 글을 참고 하시면 됩니다.
https://jydlove.tistory.com/83
이번 프로젝트에서는 kafkaNode1 하나만 설정 하여서 프로젝트를 진행하도록 하겠습니다.
Spring Kafka Producer Project
서버 설정을 위한 application.yml
server:
port: 9000
spring:
output:
# Console Color 표시
ansi:
enabled: ALWAYS
kafka:
producer:
bootstrap-servers: localhost:9092
mvc:
pathmatch:
matching-strategy: ant_path_matcher
Kafka producer configuration
package com.roopy.delivery.producer.config;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfiguration {
private final KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<String,Object> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
@Bean
public ProducerFactory<String,Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStrapServers());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // ①
return new DefaultKafkaProducerFactory(config);
}
}
소스에서 주의 해야 할 부부은 ① 메세지 전송은 JSON 객체로 전송이 되므로 VALUE_SERIALIZER_CLASS_CONFIG 설정은 JsonSerializer.class 로 해준다. 참고 URL 소스에서는 단순히 텍스트로 테스트 하였기 때문에 StringSerializer.class로 지정 해줬다.
사용자 메시시지를 받을 DeliveryController
package com.roopy.delivery.producer.controller;
import com.roopy.delivery.producer.dto.DeliveryDTO;
import com.roopy.delivery.producer.dto.DeliveryStatus;
import com.roopy.delivery.producer.dto.DeliveryStatusColor;
import com.roopy.delivery.producer.service.DeliveryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/v1/delivery")
public class DeliveryController {
@Autowired
private DeliveryService deliveryService;
@PostMapping("/status")
public String sendDeliveryStatus (@RequestBody DeliveryDTO deliveryDTO) {
// 배달 상태에 따라 Color 설정
if (deliveryDTO.getStatus().equals(DeliveryStatus.DELIVERING)) {
deliveryDTO.setStatusColor(DeliveryStatusColor.ROSE);
}
else if (deliveryDTO.getStatus().equals(DeliveryStatus.COMPLETE)) {
deliveryDTO.setStatusColor(DeliveryStatusColor.GREEN);
}
deliveryService.sendDeliveryStatus(deliveryDTO);
return "Success";
}
}
라이더의 배송상태에 따라서 UI에서 색상변경을 위해서 Color 설정 및 Service에 메세지를 전달한다.
Consumer 에 메시지 전송을 하기 위한 DeliveryServiceImpl
package com.roopy.delivery.producer.service;
import com.roopy.delivery.producer.dto.DeliveryDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Service
@RequiredArgsConstructor
@Slf4j
public class DeliveryServiceImpl implements DeliveryService {
private final KafkaTemplate<String,Object> kafkaTemplate;
@Override
public void sendDeliveryStatus(DeliveryDTO deliveryDTO) {
ListenableFuture<SendResult<String,Object>> listenableFuture = kafkaTemplate.send("gdStoreTopic", deliveryDTO);
listenableFuture.addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable e) {
log.error("Send delivery status error occured...");
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("[{}] {} Delivery status changed to {}", deliveryDTO.getId(), deliveryDTO.getStatus());
}
});
}
}
KafkaTemplate을 이용하여서 Consumer에게 메세지를 전달한다.
ListenableFutuer를 이용하여 비동기 처리 한다.
callback 으로 전송 실패 또는 성공시 로직을 추가 하면 된다.
테스트 준비
① Zooker 실행
./bin/zkServer.cmd
② Kafka Server 실행
./bin/windows/kafka-server-start.bat ./config/server.properties
③ Kafka Topic 생성 및 확인
gdStoreTopic 생성
./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --create --topic gdStoreTopic --partitions 3 --replication-factor 1
④ gdStoreTopic 생성 확인
./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic gdStoreTopic
Topic: gdStoreTopic TopicId: ey_unVENRsy_M2SCiMuaGw PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: gdStoreTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: gdStoreTopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: gdStoreTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
⑤ Kafka Consumer 실행
./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gdStoreTopic --group storeGroup
테스트
① Spring Kafka Producer 실행
KafkaDeliveryProducerApplication 실행
② Swagger UI 실행 후 전송 테스트 수행
Swagger 버전에 따라 아래 접속 URL이 틀릴 수 있습니다. 본 프로젝트 버전은 3.0 입니다.
http://http://localhost:9000/swagger-ui/index.html
아래 그림을 보면 메세지가 전송 되면 consumer 콘솔에 메세지가 정상적으로 나오는 것을
확인 할 수 있다.
소스다운로드
https://github.com/roopy1210/react-websocket-with-spring-kafka/tree/main/kafka-delivery-producer
'React > React websocket with spring kafka' 카테고리의 다른 글
React WebSocket 연동 (0) | 2022.11.26 |
---|---|
Kafka consumer를 이용한 배달 정보 수신 및 WebSocket 배달정보 전송 (0) | 2022.11.26 |
배달 정보 모니터링 시스템 만들기 (0) | 2022.11.26 |