Kafka producer를 이용한 배달 정보 전송하기

2022. 11. 26. 00:21React/React websocket with spring kafka

반응형

라이더들은 배달 정보를 앱을 통해 전송한다. 앞에서 보았지만 앱 대신에 Swagger를

이용하여서 메세지를 전송한다.

 

Zookeeper 설치 및 설정

Zookeeper 설치 및 설정은 아래 글을 참고 하시면 됩니다.

https://jydlove.tistory.com/82

이번 프로젝트에서는 zNode1 하나만 설정 하여서 프로젝트를 진행하도록 하겠습니다.

 

Kafka 설치 및 설정

Kafka 설치 및 설정은 아래 글을 참고 하시면 됩니다.

https://jydlove.tistory.com/83

이번 프로젝트에서는 kafkaNode1 하나만 설정 하여서 프로젝트를 진행하도록 하겠습니다.

 

Spring Kafka Producer Project

서버 설정을 위한 application.yml

server:
  port: 9000

spring:
  output:
    # Console Color 표시
    ansi:
      enabled: ALWAYS

  kafka:
    producer:
      bootstrap-servers: localhost:9092

  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher

 

Kafka producer configuration

package com.roopy.delivery.producer.config;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfiguration {

    private final KafkaProperties kafkaProperties;

    @Bean
    public KafkaTemplate<String,Object> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }

    @Bean
    public ProducerFactory<String,Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStrapServers());
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);  // ①

        return new DefaultKafkaProducerFactory(config);
    }

}

소스에서 주의 해야 할 부부은 ① 메세지 전송은 JSON 객체로 전송이 되므로 VALUE_SERIALIZER_CLASS_CONFIG 설정은 JsonSerializer.class 로 해준다. 참고 URL 소스에서는 단순히 텍스트로 테스트 하였기 때문에 StringSerializer.class로 지정 해줬다.

 

사용자 메시시지를 받을 DeliveryController

package com.roopy.delivery.producer.controller;

import com.roopy.delivery.producer.dto.DeliveryDTO;
import com.roopy.delivery.producer.dto.DeliveryStatus;
import com.roopy.delivery.producer.dto.DeliveryStatusColor;
import com.roopy.delivery.producer.service.DeliveryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/v1/delivery")
public class DeliveryController {

    @Autowired
    private DeliveryService deliveryService;

    @PostMapping("/status")
    public String sendDeliveryStatus (@RequestBody DeliveryDTO deliveryDTO) {

        // 배달 상태에 따라 Color 설정
        if (deliveryDTO.getStatus().equals(DeliveryStatus.DELIVERING)) {
            deliveryDTO.setStatusColor(DeliveryStatusColor.ROSE);
        }
        else if (deliveryDTO.getStatus().equals(DeliveryStatus.COMPLETE)) {
            deliveryDTO.setStatusColor(DeliveryStatusColor.GREEN);
        }

        deliveryService.sendDeliveryStatus(deliveryDTO);

        return "Success";
    }
}

라이더의 배송상태에 따라서 UI에서 색상변경을 위해서 Color 설정 및 Service에 메세지를 전달한다.

 

Consumer 에 메시지 전송을 하기 위한 DeliveryServiceImpl

package com.roopy.delivery.producer.service;

import com.roopy.delivery.producer.dto.DeliveryDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
@RequiredArgsConstructor
@Slf4j
public class DeliveryServiceImpl implements DeliveryService {

    private final KafkaTemplate<String,Object> kafkaTemplate;

    @Override
    public void sendDeliveryStatus(DeliveryDTO deliveryDTO) {
        ListenableFuture<SendResult<String,Object>> listenableFuture = kafkaTemplate.send("gdStoreTopic", deliveryDTO);
        listenableFuture.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable e) {
                log.error("Send delivery status error occured...");
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("[{}] {} Delivery status changed to {}", deliveryDTO.getId(), deliveryDTO.getStatus());
            }
        });
    }
}

KafkaTemplate을 이용하여서 Consumer에게 메세지를 전달한다.

ListenableFutuer를 이용하여 비동기 처리 한다.

callback 으로 전송 실패 또는 성공시 로직을 추가 하면 된다.

 

테스트 준비

① Zooker 실행

    ./bin/zkServer.cmd

 

Kafka Server 실행

    ./bin/windows/kafka-server-start.bat ./config/server.properties

 

Kafka Topic 생성 및 확인

     gdStoreTopic 생성

./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --create --topic gdStoreTopic --partitions 3 --replication-factor 1

 

④ gdStoreTopic 생성 확인

./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic gdStoreTopic
Topic: gdStoreTopic     TopicId: ey_unVENRsy_M2SCiMuaGw PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: gdStoreTopic     Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: gdStoreTopic     Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: gdStoreTopic     Partition: 2    Leader: 1       Replicas: 1     Isr: 1

 

⑤ Kafka Consumer 실행

./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gdStoreTopic --group storeGroup

 

테스트

Spring Kafka Producer 실행

     KafkaDeliveryProducerApplication 실행

② Swagger UI 실행 후 전송 테스트 수행

     Swagger 버전에 따라 아래 접속 URL이 틀릴 수 있습니다. 본 프로젝트 버전은 3.0 입니다.

     http://http://localhost:9000/swagger-ui/index.html

     아래 그림을 보면 메세지가 전송 되면 consumer 콘솔에 메세지가 정상적으로 나오는 것을

     확인 할 수 있다.

Kafka Producer 메세지 전송 테스트 데모

 

소스다운로드

https://github.com/roopy1210/react-websocket-with-spring-kafka/tree/main/kafka-delivery-producer

 

GitHub - roopy1210/react-websocket-with-spring-kafka

Contribute to roopy1210/react-websocket-with-spring-kafka development by creating an account on GitHub.

github.com

 

반응형