2022. 11. 26. 00:29ㆍReact/React websocket with spring kafka
Producer에서 전송한 배달정보를 수신하여서 WebSocket으로 배달정보를 전송한다.
Spring Kafka Consumer Project
서버 설정을 위한 application.yml
server:
port: 9001
spring:
output:
# Console Color 표시
ansi:
enabled: ALWAYS
kafka:
consumer:
bootstrap-servers: localhost:9092
Kafka consumer configuration
package com.roopy.delivery.consumer.config;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaConsumerConfiguration {
private final KafkaProperties kafkaProperties;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,Object>> kafkaListerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String,Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStrapServers());
config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.CONSUMER_GROUP_ID);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Object.class));
}
}
Client 와 Socket 연동을 WebSocketConfiguration 설정
package com.roopy.delivery.consumer.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/delivery") // ①
.setAllowedOrigins("http://localhost:3000/") // ②
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// enableSimpleBroker 종류에는 topic, queue 있다.
// topic: 한명이 message 를 발행했을 때 해당 토픽을 구독하고 있는 n명에게 메세지를 전달하는 경우
// queue: 한명이 message 를 발행했을 때 발핸한 한 명에게 다시 정보를 보내는 경우
registry.enableSimpleBroker("/topic"); // ③
registry.setApplicationDestinationPrefixes("/");
}
}
① addEndpoint : Client 에서 Socket Connction시 http://{서버주소}:{port}/delivery 호출시 설정
const sockJS = new SockJS('http://localhost:9001/delivery');
② setAllowedOrigins : CORS 문제 발생을 막기 위해 Client 호출 서버 주소 설정
③ enableSimpleBroker: Client 에서 메세지 수신시 /enableSimpleBroker/addEndPoint 로 설정
stompClient.subscribe('/topic/delivery’ …
Producer 에서 전송 메세지 수신을 위한 DeliveryListener
package com.roopy.delivery.consumer.listener;
import com.roopy.delivery.consumer.config.KafkaConstants;
import com.roopy.delivery.consumer.dto.DeliveryDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DeliveryListener {
@Autowired
SimpMessagingTemplate template;
@KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.CONSUMER_GROUP_ID)
public void listen(ConsumerRecord<String, DeliveryDTO> record) {
log.info("Producer message received : {}", record.value());
template.convertAndSend("/topic/delivery", record.value());
}
}
테스트
① Zooker 실행
./bin/zkServer.cmd
② Kafka Server 실행
./bin/windows/kafka-server-start.bat ./config/server.properties
③ Spring Kafka Producer 실행
KafkaDeliveryProducerApplication 실행
④ Spring Kafka Consumer 실행
KafkaDeliveryProducerApplication 실행
⑤ Swagger UI 실행 후 전송 테스트 수행
왼쪽 화면에서 첫번째줄은 Kafka Producer 에서 전송한 메세지를 출력한 결과
두번째 줄은 정상적으로 수신된메세지를 WebsSoket으로 전송한 결과 이다.
위의 테스트가 정상적으로 수행이 된 것을 확인 했다면 마지막으로 간단한 React App을
만들어서 화면에서 실제로 적용 해보도록 하자.
소스다운로드
https://github.com/roopy1210/react-websocket-with-spring-kafka/tree/main/kafka-delivery-consumer
'React > React websocket with spring kafka' 카테고리의 다른 글
React WebSocket 연동 (0) | 2022.11.26 |
---|---|
Kafka producer를 이용한 배달 정보 전송하기 (0) | 2022.11.26 |
배달 정보 모니터링 시스템 만들기 (0) | 2022.11.26 |