Springboot Kafka Consumer

2022. 1. 19. 00:33SpringBoot Kafka

반응형

지금까지 Zookeeper, Broker 설정 및 kafka console을 이용하여서 Producer, Conumer를 테스트해봤습니다.

이번에는 Springboot를 이용하여서 Consumer를 작성해보도록 하겠습니다.

 

1. 프로젝트 설정

https://start.spring.io/ 사이트에서 아래와 같이 프로젝트를 생성하여 줍니다.

Springboot Consumer 프로젝트 설정

 

프로젝트 생성 후 프로젝트 구조는 아래와 같습니다.

프로젝트 구조

2. 소스 코드

application.properties

spring.output.ansi.enabled: always
bootstrap-servers=localhost:9092,localhost:9093,localhost:9094

bootstrap-servers는 앞에서 3개의 Broker 주소입니다.

 

KafkaProperties.java

package com.roopy.config;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("kafka")
@Data
public class KafkaProperties {
    public static final String CONSUMER_GROUP_ID = "testGroup";

    @Value("${bootstrap-servers}")
    private String bootStrapServers;
}

CONSUMER_GROUP_ID는 앞에서 kafka-console-consumer.bat을 통해 생성한 group id입니다.

 

ConsumerConfiguration.java

package com.roopy.config;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
@RequiredArgsConstructor
public class ConsumerConfiguration {

    private final KafkaProperties kafkaProperties;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<Integer,String> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer,String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStrapServers());
        config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaProperties.CONSUMER_GROUP_ID);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }
}

Consumer에 대한 설정 정보 소스입니다.

아래 사이트를 참고하여서 작성된 소스입니다.

https://docs.spring.io/spring-kafka/reference/html/#with-java-configuration-no-spring-boot

 

Spring for Apache Kafka

When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka

docs.spring.io

 

ConsumerService.java

Producer에서 전송한 메시지를 수신하는 소스

package com.roopy.consumer;

import com.roopy.config.KafkaProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ConsumerService {

    @KafkaListener(topics = {"testTopic"}, groupId = KafkaProperties.CONSUMER_GROUP_ID)
    public void consumerMessage(String message) {
        log.info("Received Message : {}", message);
    }

}

 

SpringbootKafkaConsumerApplication.java

package com.roopy;

import com.roopy.config.KafkaProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

@SpringBootApplication
@EnableConfigurationProperties(value = {KafkaProperties.class})
public class SpringbootKafkaConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringbootKafkaConsumerApplication.class, args);
	}

}

 

3. 테스트

Producer 코드는 다음 예제에서 작성할 것이므로 Consumer 테스트를 위해서 kafka-console-producer를 이용하여서 메시지를 전송하고 위에 만든 예제에서 메세지 수신을 테스트한다.

 

Consumer Application은 3개를 실행한 후 메시지 수신을 확인한다.

 

Producer 실행

Producer 메세지 전송

Consumer#1 실행

Consumer#1

Consumer#2 실행

Consumer#2

Consumer#3 실행

Consumer#3

위의 이미지를 보면 Producer에서 전송한 메시지가 Consumer#1, #2, #3으로 Round-Robin 방식으로 전송된 것을 확인할 수 있다.

 

예제소스는 아래사이트에서 다운받으실 수 있습니다.

https://github.com/roopy1210/springboot-kafka-consumer

 

GitHub - roopy1210/springboot-kafka-consumer

Contribute to roopy1210/springboot-kafka-consumer development by creating an account on GitHub.

github.com

 

반응형

'SpringBoot Kafka' 카테고리의 다른 글

Springboot Kafka Producer  (0) 2022.01.23
Kafka Topic 생성 및 Producer, Consumer 테스트  (0) 2022.01.16
Kafka(Broker) 설치 및 설정  (0) 2022.01.15
Kafka 개념  (0) 2022.01.13