Topic

2020. 4. 19. 22:02Spring Micro Services/RabbitMQ

반응형

Constants.java
0.00MB
EmitLogTopic.java
0.00MB
ReceiveLogsTopic.java
0.00MB

Exchanges-Topics

topic은 routing pattern과 routing key와 routing pattern 사이의 와일드카드 매치를 기본으로 메세지를 보내게 된다.

 

Senario 1

모든 메세지를 수신 하기 원하는 경우

 

  • Exchage: topics
  • Queue1: Temp Queue
  • Routing Pattern: #

 

Senario 2

특정 개발팀만 메세지를 수신 하기 원하는 경우

 

  • Exchage: topics
  • Queue1: Temp Queue
  • Routing Pattern: dev.team.*

 

Senario 3

특정 디자인팀만 메세지를 수신 하기 원하는 경우

 

  • Exchage: topics
  • Queue1: Temp Queue
  • Routing Pattern: developer.*

 

 

PRODUCER

메세지 전송을 위한 EmitLogTopic.java 프로그램을 작성한다.

package com.rabbitmq.tutorials.chap05;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

	public static void main(String[] argv) {
		System.out.println(Constants.HEADER);
		String rabbitmqHost = "localhost";
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(rabbitmqHost);
		
		try {
			// 서버연결
			Connection connection = factory.newConnection();
			
			// 채널생성	
			Channel channel = connection.createChannel();
			
			// EXCHANGE생성
			channel.exchangeDeclare(Constants.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
			
			String routingKey = getRoutingkey(argv);
			String message = getMessage(argv);
		
			channel.basicPublish(Constants.EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
			System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private static String getRoutingkey(String[] strings) {
		if (strings.length < 1)
			return "anonymous.info";
		
		return strings[0];
	}

	private static String getMessage(String[] strings) {
		if (strings.length < 2) 
			return "Hello World!";
					
		return joinStrings(strings, " ", 1);
	}
	
	private static String joinStrings(String[] strings, String delimeter, int startIndex) {
		int length = strings.length;
		if (length == 0) return "";
		if (length <= startIndex) return "";
		StringBuilder words = new StringBuilder(strings[startIndex]);
		for (int i = startIndex + 1; i < length; i++) {
			words.append(delimeter).append(strings[i]);
		}
		return words.toString();
	}
}

 

CONSUMER

메세지 수신을 위한 ReceiveLogsTopic.java 프로그램을 작성한다.

package com.rabbitmq.tutorials.chap05;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

	public static void main(String[] argv) {
		System.out.println(Constants.HEADER);
		String rabbitmqHost = "localhost";
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(rabbitmqHost);
		
		try {
			// 서버연결
			Connection connection = factory.newConnection();
			
			// 채널생성	
			Channel channel = connection.createChannel();
			
			// EXCHANGE생성
			channel.exchangeDeclare(Constants.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
			String queueName = channel.queueDeclare().getQueue();
			System.out.println(" [*] Queue Name : " + queueName);
			
			if (argv.length < 1) {
				System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
				System.exit(1);
			}
			
			for (String bindingKey : argv) {
				channel.queueBind(queueName, Constants.EXCHANGE_NAME, bindingKey);
			}
			
			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
			
			
			// 메세지수신
			DeliverCallback deliverCallback = (consumerTag, delivery) -> {
				String message = new String(delivery.getBody(), "UTF-8");
				System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'"  + message + "'");
			};
			
			boolean isAutoAck = true;
			channel.basicConsume(queueName, isAutoAck, deliverCallback, consumerTag -> { });
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

 

프로그램 실행

 

CONSUMER 실행

 

아래 이미지와 같이 cmd 창을 두개 띄운 후 클래스가 위치한 경로로 이동 후 java -classpath ../lib/*; com.rabbitmq.tutorials.chap05.ReceiveLogsTopic 명령어를 실행한다.

위의 정의된 시나리오데로 3가지 Routing Key로 실행한다.

 

위의 실행 결과를 보면 두개의 임시 Queue 가 생성된 것을 확인 할 수 있다.

 

그러면 위의 그림과 같이 Excnage도 정상적으로 생성되었는지 확인해보자

위의 그림대로 logs라는 exchage라는 생성 되었고 Type은 topic 이면 생성된 Queue에서 메세지를 수신 할 것이라는 것을 알수 있다.

 

그러면 Queue 도 정상적으로 생성 되었는지 확인 해보자

 

 

PRODUCER 실행

실행결과

1. Routing Key를 "#" 메세지를 전송한 경우 

   - Queue1는 와일드카드를 "#"로 하였기 때문에 모든 메세지를 수신 할 수 있다.

2. Routing Key를 "dev.team.server" 메세지를 전송한 경우

   - Queue1는 와일드카드를 "#"로 하였기 때문에 메세지를 수신하게되고 Queue2의 메세지를 서버개발팀만 메세지를 수신하도록 와일드

     카드를 설정 하였기 때문에 메세지 수신

3. Routing Key를 "design.team.web" 메세지를 전송한 경우

   - Queue1는 와일드카드를 "#"로 하였기 때문에 메세지를 수신하게되고 Queue3의 메세지를 디자인팀만 메세지를 수신하도록 와일드카

     드를 설정 하였기 때문에 메세지 수신

 

 

※ 참고사이트

RabbitMQ Tutorial : https://www.rabbitmq.com/tutorials/tutorial-five-java.html



반응형

'Spring Micro Services > RabbitMQ' 카테고리의 다른 글

Spring AMQP  (0) 2020.04.19
Routing  (0) 2020.04.19
Publish/Subscribe  (0) 2020.04.19
Work Queues  (0) 2020.04.19
Hello World  (0) 2020.04.19