Kafka consumer를 이용한 배달 정보 수신 및 WebSocket 배달정보 전송

2022. 11. 26. 00:29React/React websocket with spring kafka

반응형

Producer에서 전송한 배달정보를 수신하여서 WebSocket으로 배달정보를 전송한다.

 

Spring Kafka Consumer Project

서버 설정을 위한 application.yml

server:
  port: 9001

spring:
  output:
    # Console Color 표시
    ansi:
      enabled: ALWAYS

  kafka:
    consumer:
      bootstrap-servers: localhost:9092

 

Kafka consumer configuration

package com.roopy.delivery.consumer.config;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaConsumerConfiguration {
    private final KafkaProperties kafkaProperties;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,Object>> kafkaListerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String,Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStrapServers());
        config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.CONSUMER_GROUP_ID);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Object.class));
    }

}

 

Client 와 Socket 연동을 WebSocketConfiguration 설정

package com.roopy.delivery.consumer.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer  {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/delivery")                      // ①      
                .setAllowedOrigins("http://localhost:3000/")   // ②
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // enableSimpleBroker 종류에는 topic, queue 있다.
        // topic: 한명이 message 를 발행했을 때 해당 토픽을 구독하고 있는 n명에게 메세지를 전달하는 경우
        // queue: 한명이 message 를 발행했을 때 발핸한 한 명에게 다시 정보를 보내는 경우
        registry.enableSimpleBroker("/topic");                 // ③
        registry.setApplicationDestinationPrefixes("/");      
    }
}

① addEndpoint : Client 에서 Socket Connction시 http://{서버주소}:{port}/delivery 호출시 설정

    const sockJS = new SockJS('http://localhost:9001/delivery');

② setAllowedOrigins : CORS 문제 발생을 막기 위해 Client 호출 서버 주소 설정

③ enableSimpleBroker: Client 에서 메세지 수신시 /enableSimpleBroker/addEndPoint 로 설정

    stompClient.subscribe('/topic/delivery’ …

 

Producer 에서 전송 메세지 수신을 위한 DeliveryListener

package com.roopy.delivery.consumer.listener;

import com.roopy.delivery.consumer.config.KafkaConstants;
import com.roopy.delivery.consumer.dto.DeliveryDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DeliveryListener {

    @Autowired
    SimpMessagingTemplate template;

    @KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.CONSUMER_GROUP_ID)
    public void listen(ConsumerRecord<String, DeliveryDTO> record) {
        log.info("Producer message received : {}", record.value());

        template.convertAndSend("/topic/delivery", record.value());
    }
}

 

테스트

① Zooker 실행

    ./bin/zkServer.cmd

 

② Kafka Server 실행

     ./bin/windows/kafka-server-start.bat ./config/server.properties

 

③ Spring Kafka Producer 실행

     KafkaDeliveryProducerApplication 실행

 

④ Spring Kafka Consumer 실행

     KafkaDeliveryProducerApplication 실행

 

⑤ Swagger UI 실행 후 전송 테스트 수행

배달정보 수신 및 웹소켓 전송 데모

왼쪽 화면에서 첫번째줄은 Kafka Producer 에서 전송한 메세지를 출력한 결과

두번째 줄은 정상적으로 수신된메세지를 WebsSoket으로 전송한 결과 이다.

 

위의 테스트가 정상적으로 수행이 된 것을 확인 했다면 마지막으로 간단한 React App을

만들어서 화면에서 실제로 적용 해보도록 하자.

 

소스다운로드

https://github.com/roopy1210/react-websocket-with-spring-kafka/tree/main/kafka-delivery-consumer

 

GitHub - roopy1210/react-websocket-with-spring-kafka

Contribute to roopy1210/react-websocket-with-spring-kafka development by creating an account on GitHub.

github.com

 

반응형