Routing

2020. 4. 19. 21:30Spring Micro Services/RabbitMQ

반응형

Constants.java
0.00MB
EmitLogDirect.java
0.00MB
ReceiveLogsDirect.java
0.00MB

Exchanges-Direct

 

앞선 예제에서는 메세지를 Queue로 바로 보내고 Consumer 입장에서는 메세지를 구독하는 형태 였다.

이번 예제에서는 Queue로 메세지를 바로 보내는 것이 아니라 Exchange로 메세지를 전달하고 Exchange 에서는 받은 메세지를

처음에 언급한 4가지 Exchange Type에 의해서 Queue로 Routing 처리 하게 된다. 그 유형중 Direct에 대해 알아볼려고 한다.

 

 

 

위의 그림을 이해하면 아래 소스를 이해하는데 도움이 될것이다.

1. Producer는 Exchage 이름은 "direct_logs"로 하고 Type은 "direct"으로 해서 Exchange를 생성한다.

2. direct exchage "direct_logs" 는 두개의 Queue에 Binding 된다. 첫번째 Queue1 에는 "error" binding key 로 설정이 되고

   두번째 Queue2 에는 "info,error,warning" binding key로 설정이 된다.

3. Consumer는 설정된 binding key 에 따라서 메세지를 수신하게 된다.

   만약 Producer가 binding key 를 error로 설정해서 보내게 되면 Q1, Q2 두군데로 메세지가 전송되고 Consumer는 

   수신하게 된다

 

PRODUCER 

메세지 전송을 위한 EmitDirect.java 프로그램을 작성한다.

package com.rabbitmq.tutorials.chap04;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

	public static void main(String[] argv) {
		System.out.println(Constants.HEADER);
		String rabbitmqHost = "localhost";
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(rabbitmqHost);
		
		try {
			// 서버연결
			Connection connection = factory.newConnection();
			
			// 채널생성	
			Channel channel = connection.createChannel();
			
			// EXCHANGE생성
			channel.exchangeDeclare(Constants.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
			
			String routingKey = getRoutingkey(argv);
			String message = getMessage(argv);
		
			channel.basicPublish(Constants.EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
			System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private static String getRoutingkey(String[] strings) {
		if (strings.length < 1)
			return "info";
		
		return strings[0];
	}

	private static String getMessage(String[] strings) {
		if (strings.length < 2) 
			return "Hello World!";
					
		return joinStrings(strings, " ", 1);
	}
	
	private static String joinStrings(String[] strings, String delimeter, int startIndex) {
		int length = strings.length;
		if (length == 0) return "";
		if (length <= startIndex) return "";
		StringBuilder words = new StringBuilder(strings[startIndex]);
		for (int i = startIndex + 1; i < length; i++) {
			words.append(delimeter).append(strings[i]);
		}
		return words.toString();
	}
}

 

CONSUMER

메세지 수신을 위한 ReceiveLogsDirect.java 프로그램을 작성한다.

package com.rabbitmq.tutorials.chap04;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsDirect {

	public static void main(String[] argv) {
		System.out.println(Constants.HEADER);
		String rabbitmqHost = "localhost";
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(rabbitmqHost);
		
		try {
			// 서버연결
			Connection connection = factory.newConnection();
			
			// 채널생성	
			Channel channel = connection.createChannel();
			
			// EXCHANGE생성
			channel.exchangeDeclare(Constants.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
			String queueName = channel.queueDeclare().getQueue();
			System.out.println(" [*] Queue Name : " + queueName);
			
			if (argv.length < 1) {
				System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
				System.exit(1);
			}
			
			for (String routingKey : argv) {
				channel.queueBind(queueName, Constants.EXCHANGE_NAME, routingKey);
			}
			
			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
			
			
			// 메세지수신
			DeliverCallback deliverCallback = (consumerTag, delivery) -> {
				String message = new String(delivery.getBody(), "UTF-8");
				System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'"  + message + "'");
			};
			
			boolean isAutoAck = true;
			channel.basicConsume(queueName, isAutoAck, deliverCallback, consumerTag -> { });
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

프로그램 실행

CONSUMER 실행

아래 이미지와 같이 cmd 창을 두개 띄운 후 클래스가 위치한 경로로 이동 후 java -classpath ../lib/*;  com.rabbitmq.tutorials.chap04.ReceiveLogsDirect 명령어를 실행한다. 모든 Queue 에세 메세지 수신을 하기 위해서 두개의 Consumer을 실행한다.

위의 실행 결과를 보면 두개의 임시 Queue 가 생성된 것을 확인 할 수 있다.

 

그러면 위의 그림과 같이 Excnage도 정상적으로 생성되었는지 확인해보자

위의 그림대로 logs라는 exchage라는 생성 되었고 Type은 fanout 이면 생성된 Queue에서 메세지를 수신 할 것이라는 것을 알수 있다.

 

그러면 Queue 도 정상적으로 생성 되었는지 확인 해보자

 

PRODUCER 실행

1. 첫번째로는 BindingKey="error" 로 실행 하였을 경우 Q1,Q2에 모두 error key 가 Binding 되어 있으므로 양쪽 cmd 창에 메세지가 수신

   되는 것을 확인 할 수 있다. 

2. 두번째로는 BindingKey="info"로 전송할 경우 Q2에만 key가 Binding 되어 있으므로 cmd창 두번째에만 메세지가 수신 되는 것을 확인

   할 수 있다. 

 

※ 참고사이트

RabbitMQ Tutorial : https://www.rabbitmq.com/tutorials/tutorial-one-java.html

 

반응형

'Spring Micro Services > RabbitMQ' 카테고리의 다른 글

Spring AMQP  (0) 2020.04.19
Topic  (0) 2020.04.19
Publish/Subscribe  (0) 2020.04.19
Work Queues  (0) 2020.04.19
Hello World  (0) 2020.04.19