Publish/Subscribe

2020. 4. 19. 21:02Spring Micro Services/RabbitMQ

반응형

Constants.java
0.00MB
EmitLog.java
0.00MB
ReceiveLogs.java
0.00MB

Exchanges-fanout

 

앞선 예제에서는 메세지를 Queue로 바로 보내고 Consumer 입장에서는 메세지를 구독하는 형태 였다.

이번 예제에서는 Queue로 메세지를 바로 보내는 것이 아니라 Exchange로 메세지를 전달하고 Exchange 에서는 받은 메세지를 처음에 언급한 4가지 Exchange Type에 의해서 Queue로 Routing 처리 하게 된다. 그 유형중 fanout에 대해 알아볼려고 한다.

 

위의 그림을 이해하면 아래 소스를 이해하는데 도움이 될것이다.

1. Producer는 Exchange 이름을 "logs"로 하고 Type은 "fanout"으로 해서 Exchange를 생성한다.

2. Producer는 생성된 Exchange 이름에 메세지를 전송한다.

3. Consumer는 임시 Queue를 생성하고 메세지를 전송 받는다.

 

PRODUCER

메세지를 전송하기 위한 EmitLog.java 프로그램을 작성한다.

package com.rabbitmq.tutorials.chap03;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

	public static void main(String[] args) {
		System.out.println(Constants.HEADER);
		String rabbitmqHost = "localhost";
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(rabbitmqHost);
		
		try {
			// 서버연결
			Connection connection = factory.newConnection();
			
			// 채널생성	
			Channel channel = connection.createChannel();
			
			// EXCHANGE생성
			channel.exchangeDeclare(Constants.EXCHANGE_NAME, "fanout");
			
			String message = args.length < 1 ? "info: Hello World!" : String.join(" ", args);
		
			channel.basicPublish(Constants.EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
			System.out.println(" [x] Sent '" + message + "'");
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

 

CONSUMER

메세지를 수신하기 위한 ReceiveLogs.java 프로그램을 작성한다.

package com.rabbitmq.tutorials.chap03;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {

	public static void main(String[] args) {
		System.out.println(Constants.HEADER);
		String rabbitmqHost = "localhost";
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(rabbitmqHost);
		
		try {
			// 서버연결
			Connection connection = factory.newConnection();
			
			// 채널생성	
			Channel channel = connection.createChannel();
			
			// EXCHANGE생성
			channel.exchangeDeclare(Constants.EXCHANGE_NAME, "fanout");
			
			// Actively declare a server-named exclusive, autodelete, non-durable queue
			String queueName = channel.queueDeclare().getQueue();
			System.out.println(" [*] Queue Name : " + queueName);
			channel.queueBind(queueName, Constants.EXCHANGE_NAME, "");
			
			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");			
			
			// 메세지수신
			DeliverCallback deliverCallback = (consumerTag, delivery) -> {
				String message = new String(delivery.getBody(), "UTF-8");
				System.out.println(" [x] Received '" + message + "'");
			};
			
			boolean isAutoAck = false;
			channel.basicConsume(queueName, isAutoAck, deliverCallback, consumerTag -> { });
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

 

프로그램 실행

CONSUMER 실행

두 개의 CMD 창을 띄운 후 java -classpath ../lib/*; com.rabbitmq.tutorials.chap03.ReceiveLogs 명령을 실행한다.

실행 결과는 아래와 같이 보여 져야 한다.

정상적으로 프로그램이 실행 되었다며 BROKER에서 Exchange 와 Queue가 정상적으로 생성되었는지 확인 해보자

 

위의 그림대로 logs라는 exchage라는 생성 되었고 Type은 fanout 이면 생성된 Queue에서 메세지를 수신 할 것이라는 것을 알수 있다.

 

그러면 Queue 도 정상적으로 생성 되었는지 확인 해보자

위의 그림과 같이 Queue도 정상적으로 생성된 것을 확인 할 수 있다.

 

 

PRODUCER 실행

CMD 창을 띄운 후 java -classpath ../lib/*; com.rabbitmq.tutorials.chap03.EmitLog 명령을 실행한다.

실행 결과는 위의 실행 한 두개의 Queue에서 전송한 메세지를 동시에 수신하게 된다.

 

※ 참고사이트

RabbitMQ Tutorial : https://www.rabbitmq.com/tutorials/tutorial-three-java.html



반응형

'Spring Micro Services > RabbitMQ' 카테고리의 다른 글

Topic  (0) 2020.04.19
Routing  (0) 2020.04.19
Work Queues  (0) 2020.04.19
Hello World  (0) 2020.04.19
Quick Start  (0) 2020.04.17