Springboot Kafka Producer

2022. 1. 23. 23:52SpringBoot Kafka

반응형

10초마다 메시지를 Consumer로 전송하는 Producer 예제를 만들도록 하겠습니다.

 

1. 프로젝트 설정

앞선 예제와 동일하게 https://start.spring.io/ 사이트에서 springboot-kafka-producer 프로젝트를 생성해줍니다.

프로젝트 생성 후 프로젝트 구조는 아래와 같습니다.

프로젝트 구조

2. 소스 코드

application.properties

server.port=10001
spring.output.ansi.enabled: always
bootstrap-servers=localhost:9092,localhost:9093,localhost:9094

 

KafkaProperties.java

package com.roopy.config;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("kafka")
@Data
public class KafkaProperties {
    @Value("${bootstrap-servers}")
    private String bootStrapServers;
}

 

KafkaTemplateConfiguration.java

package com.roopy.config;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@RequiredArgsConstructor
public class KafkaTemplateConfiguration {

    private final KafkaProperties kafkaProperties;

    @Bean
    public KafkaTemplate<String,Object> kafkaTemplate() {
        return new KafkaTemplate<String,Object>(producerFactory());
    }

    @Bean
    public ProducerFactory<String,Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStrapServers());
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<String,Object>(config);
    }

}

KafaTemplate은 producer를 wraps 하고 Topic에 데이터를 보내는 편리한 방법을 제공합니다.

 

KafkaProducerService.java

package com.roopy.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.time.LocalDateTime;

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducerService {

    private static int runningId = 0;

    private final KafkaTemplate kafkaTemplate;

    @Scheduled(fixedRate = 1000*10, initialDelay = 5*1000)
    public void produceMessage() {
        log.info("Produce Message - BEGIN");
        String message = String.format("%d 번째 메세지를 %s 에 전송 하였습니다.", runningId++, LocalDateTime.now().toString());
        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("testTopic", message);
        listenableFuture.addCallback(new ListenableFutureCallback<Object>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("ERROR Kafka error happend", ex);
            }

            @Override
            public void onSuccess(Object result) {
                log.info("SUCCESS!! This is the reulst: {}", result);
            }
        });

        log.info("Produce Message - END {}", message);
    }

}

10초마다 비동기 방식으로 Consumer에 메시지를 전달합니다.

 

SpringbootKafkaProducerApplication.java

package com.roopy;

import com.roopy.config.KafkaProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableConfigurationProperties(value={KafkaProperties.class})
@EnableScheduling
public class SpringbootKafkaProducerApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringbootKafkaProducerApplication.class, args);
	}

}

 

3. 테스트

테스트를 위해서 zookeeper와 broker는 실행 되어 있어야 합니다.

 

Consumer 실행

Consumer #1 실행
Consumer #2 실행
Consumer #3 실행

Producer 실행

Producer 실행

위의 이미지를 확인해 보면 Producer에서는 10초 간격으로 메시지를 로 전송하며 Consumer는 Round-Robin 방식으로 메시지를 수신하는 것을 확인할 수 있다.

 

예제 소스는 아래 사이트에서 다운받으실 수 있습니다.

https://github.com/roopy1210/springboot-kafka-producer

 

GitHub - roopy1210/springboot-kafka-producer

Contribute to roopy1210/springboot-kafka-producer development by creating an account on GitHub.

github.com

 

반응형

'SpringBoot Kafka' 카테고리의 다른 글

Springboot Kafka Consumer  (0) 2022.01.19
Kafka Topic 생성 및 Producer, Consumer 테스트  (0) 2022.01.16
Kafka(Broker) 설치 및 설정  (0) 2022.01.15
Kafka 개념  (0) 2022.01.13