2022. 1. 19. 00:33ㆍSpringBoot Kafka
지금까지 Zookeeper, Broker 설정 및 kafka console을 이용하여서 Producer, Conumer를 테스트해봤습니다.
이번에는 Springboot를 이용하여서 Consumer를 작성해보도록 하겠습니다.
1. 프로젝트 설정
https://start.spring.io/ 사이트에서 아래와 같이 프로젝트를 생성하여 줍니다.
프로젝트 생성 후 프로젝트 구조는 아래와 같습니다.
2. 소스 코드
application.properties
spring.output.ansi.enabled: always
bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
bootstrap-servers는 앞에서 3개의 Broker 주소입니다.
KafkaProperties.java
package com.roopy.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("kafka")
@Data
public class KafkaProperties {
public static final String CONSUMER_GROUP_ID = "testGroup";
@Value("${bootstrap-servers}")
private String bootStrapServers;
}
CONSUMER_GROUP_ID는 앞에서 kafka-console-consumer.bat을 통해 생성한 group id입니다.
ConsumerConfiguration.java
package com.roopy.config;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class ConsumerConfiguration {
private final KafkaProperties kafkaProperties;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Integer,String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
@Bean
public ConsumerFactory<Integer,String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStrapServers());
config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaProperties.CONSUMER_GROUP_ID);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
}
Consumer에 대한 설정 정보 소스입니다.
아래 사이트를 참고하여서 작성된 소스입니다.
https://docs.spring.io/spring-kafka/reference/html/#with-java-configuration-no-spring-boot
ConsumerService.java
Producer에서 전송한 메시지를 수신하는 소스
package com.roopy.consumer;
import com.roopy.config.KafkaProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class ConsumerService {
@KafkaListener(topics = {"testTopic"}, groupId = KafkaProperties.CONSUMER_GROUP_ID)
public void consumerMessage(String message) {
log.info("Received Message : {}", message);
}
}
SpringbootKafkaConsumerApplication.java
package com.roopy;
import com.roopy.config.KafkaProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties(value = {KafkaProperties.class})
public class SpringbootKafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootKafkaConsumerApplication.class, args);
}
}
3. 테스트
Producer 코드는 다음 예제에서 작성할 것이므로 Consumer 테스트를 위해서 kafka-console-producer를 이용하여서 메시지를 전송하고 위에 만든 예제에서 메세지 수신을 테스트한다.
Consumer Application은 3개를 실행한 후 메시지 수신을 확인한다.
Producer 실행
Consumer#1 실행
Consumer#2 실행
Consumer#3 실행
위의 이미지를 보면 Producer에서 전송한 메시지가 Consumer#1, #2, #3으로 Round-Robin 방식으로 전송된 것을 확인할 수 있다.
예제소스는 아래사이트에서 다운받으실 수 있습니다.
https://github.com/roopy1210/springboot-kafka-consumer
'SpringBoot Kafka' 카테고리의 다른 글
Springboot Kafka Producer (0) | 2022.01.23 |
---|---|
Kafka Topic 생성 및 Producer, Consumer 테스트 (0) | 2022.01.16 |
Kafka(Broker) 설치 및 설정 (0) | 2022.01.15 |
Kafka 개념 (0) | 2022.01.13 |