Springboot Kafka Producer
2022. 1. 23. 23:52ㆍSpringBoot Kafka
반응형
10초마다 메시지를 Consumer로 전송하는 Producer 예제를 만들도록 하겠습니다.
1. 프로젝트 설정
앞선 예제와 동일하게 https://start.spring.io/ 사이트에서 springboot-kafka-producer 프로젝트를 생성해줍니다.
프로젝트 생성 후 프로젝트 구조는 아래와 같습니다.
2. 소스 코드
application.properties
server.port=10001
spring.output.ansi.enabled: always
bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
KafkaProperties.java
package com.roopy.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("kafka")
@Data
public class KafkaProperties {
@Value("${bootstrap-servers}")
private String bootStrapServers;
}
KafkaTemplateConfiguration.java
package com.roopy.config;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@RequiredArgsConstructor
public class KafkaTemplateConfiguration {
private final KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<String,Object> kafkaTemplate() {
return new KafkaTemplate<String,Object>(producerFactory());
}
@Bean
public ProducerFactory<String,Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStrapServers());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<String,Object>(config);
}
}
KafaTemplate은 producer를 wraps 하고 Topic에 데이터를 보내는 편리한 방법을 제공합니다.
KafkaProducerService.java
package com.roopy.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.time.LocalDateTime;
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducerService {
private static int runningId = 0;
private final KafkaTemplate kafkaTemplate;
@Scheduled(fixedRate = 1000*10, initialDelay = 5*1000)
public void produceMessage() {
log.info("Produce Message - BEGIN");
String message = String.format("%d 번째 메세지를 %s 에 전송 하였습니다.", runningId++, LocalDateTime.now().toString());
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("testTopic", message);
listenableFuture.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onFailure(Throwable ex) {
log.error("ERROR Kafka error happend", ex);
}
@Override
public void onSuccess(Object result) {
log.info("SUCCESS!! This is the reulst: {}", result);
}
});
log.info("Produce Message - END {}", message);
}
}
10초마다 비동기 방식으로 Consumer에 메시지를 전달합니다.
SpringbootKafkaProducerApplication.java
package com.roopy;
import com.roopy.config.KafkaProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableConfigurationProperties(value={KafkaProperties.class})
@EnableScheduling
public class SpringbootKafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootKafkaProducerApplication.class, args);
}
}
3. 테스트
테스트를 위해서 zookeeper와 broker는 실행 되어 있어야 합니다.
Consumer 실행
Producer 실행
위의 이미지를 확인해 보면 Producer에서는 10초 간격으로 메시지를 로 전송하며 Consumer는 Round-Robin 방식으로 메시지를 수신하는 것을 확인할 수 있다.
예제 소스는 아래 사이트에서 다운받으실 수 있습니다.
https://github.com/roopy1210/springboot-kafka-producer
반응형
'SpringBoot Kafka' 카테고리의 다른 글
Springboot Kafka Consumer (0) | 2022.01.19 |
---|---|
Kafka Topic 생성 및 Producer, Consumer 테스트 (0) | 2022.01.16 |
Kafka(Broker) 설치 및 설정 (0) | 2022.01.15 |
Kafka 개념 (0) | 2022.01.13 |