본문 바로가기
IT, 개발/JAVA

AMQP RabbitMQ Direct Exchange 통신 예제 (JAVA 예제)

by 개발자스터디 2022. 8. 29.
반응형

 

 

 

 

저번에 이어서 AMQP 통신 테스트를 해보도록 하겠습니다.

 

RabbitMQ 설치하는 방법은 아래 링크를 확인하시기 바랍니다.

 

https://itstudy402.tistory.com/55

 

RabbitMQ를 사용한 AMQP 통신 구성하기 (Ubuntu)

AMQP 통신을 테스트해보기 위해 RabbitMQ를 설치해보도록 하겠습니다. 저는 우분투에서 터미널로 진행하였습니다. 1. RabbitMQ 설치 터미널을 사용하여 RabbitMQ를 설치합니다. $ apt install rabbitmq-serv.

itstudy402.tistory.com

 

AMQP의 통신 방식을 간단하게 요약해보면

 

Publisher는 Exchange에게 메시지를 보내고,
Exchange는 Queue에 해당 메시지를 분배하고,
Queue의 메시지를 Consumer가 읽는 방식입니다.

 

Exchange 종류는 여러 개가 있는데 이번에는 Direct Type으로 테스트해보도록 하겠습니다.

 

Direct Exchange는 Routing key로 매핑하여 통신하는 방식입니다.

메시지의 Routing key와 Queue에 Binding 할 때 Routing key가 완벽하게 일치해야 통신이 가능합니다.

 

// 예제 속 Consumer
channel.queueBind(exchangeName, exchangeName, routingKey);

// 예제 속 Producer
channel.basicPublish(exchangeName, routingKey, null, body);

위 코드에서 Consumer, Producer 각각의 routingkey가 완전히 일치해야 합니다.

 

예제를 확인해보도록 하겠습니다.

 

 

1. 라이브러리 

 

amqp-client-5.15.0.jar

 

 

2. Consumer 예제

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.BuiltinExchangeType;
 
public class AMQPConsumer{
 
    String host = "host";
    int port = 5672;
    String username = "username";
    String password = "password";
 
    String exchangeName = "amq.direct";
    String queueName = "queuename";
    String routingKey = "routingkey";
    
    public void directConsumer () throws IOException, TimeoutException 
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        // Exchange 선언 
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
        
        // Queue 선언 및 Binding
        channel.queueDeclare(exchangeName, falsefalsefalsenull);
        channel.queueBind(exchangeName, exchangeName, routingKey);
       
        Consumer consumer = new DefaultConsumer(channel) 
        {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
            {
                String message = new String(body, "UTF-8");
                System.out.println("received : "  + message);
            }
        };
        
        channel.basicConsume(exchangeName, true, consumer);
    }
}
cs

 

 

3. Producer 예제 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
 
public class AMQPProducer{
    
    String host = "host";
    int port = 5672;
    String username = "username";
    String password = "password";  
 
    String exchangeName = "amq.direct";
    String routingKey = "routingkey";
    
    public void directProducer(String message) throws IOException, TimeoutException 
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        String message = "Hello World";
        byte[] body = message.getBytes();
 
        // Publish
        channel.basicPublish(exchangeName, routingKey, null, body);
        
        channel.close();
        connection.close();
    }
}
cs

 

 

 

 

 

received : Hello World

 

위 결과가 잘 출력되는지 확인해보시기 바랍니다.

 

 

 

 

 

 

728x90
반응형