본문 바로가기
IT, 개발/JAVA

AMQP RabbitMQ Topic Exchange 통신 예제 (JAVA 예제)

by 개발자스터디 2022. 8. 30.
반응형

 

 

 

 

저번에 이어서 AMQP 통신 테스트를 해보도록 하겠습니다.

 

이번에는 Topic Type Exchange 통신을 테스트해보도록 하겠습니다.

 

Topic Exchange는 특정 Routing 패턴이 일치하는 Queue로 라우팅 하는 방식입니다.

Routing 패턴은 마침표로 구분되는 단어들의 리스트를 말합니다.

 

* : 1개의 단어와 치환 
# : 0개나 여러 개 단어와 치환 가능 

예) testKey.* / testKey.*.* / *.testKey / testKey.# 

 

예를 들어 testKey.*.* 라는 패턴으로 Binding 했다면 

testKey.abc.abc 이런 방식으로 패턴을 일치시켜야 메시지를 받아갈 수 있습니다.

testKey.# 라면 testKey / testKey.abc / testKey.abc.abc 이렇게 설정해도 치환이 가능하다는 것입니다.

 

 

반응형

 

1. 라이브러리 

 

amqp-client-5.15.0.jar
slf4j-api-2.0.0.jar

 

 

2. Consumer 예제

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.BuiltinExchangeType;
 
public class AMQPConsumer{
 
    String host = "host";
    int port = 5672;
    String username = "username";
    String password = "password";
 
    String exchangeName = "amq.topic";
    String queueName = "queuename";
    String routingPattern = "testKey.*.*";
    
    public void topicConsumer () throws IOException, TimeoutException 
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        // Exchange 선언 
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true);
        
        // Queue 선언 및 Binding
        channel.queueDeclare(exchangeName, falsefalsefalsenull);
        channel.queueBind(exchangeName, exchangeName, routingPattern);
       
        Consumer consumer = new DefaultConsumer(channel) 
        {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
            {
                String message = new String(body, "UTF-8");
                System.out.println("received : "  + message);
            }
        };
        
        channel.basicConsume(exchangeName, true, consumer);
    }
}
cs

 

 

 

3. Producer 예제 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
 
public class AMQPProducer{
    
    String host = "host";
    int port = 5672;
    String username = "username";
    String password = "password";  
 
    String exchangeName = "amq.topic";
    String routingPattern = "testKey.abc.xyz";
    
    public void topicProducer(String message) throws IOException, TimeoutException 
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        String message = "Hello World";
        byte[] body = message.getBytes();
 
        // Publish
        channel.basicPublish(exchangeName, routingPattern, null, body);
        
        channel.close();
        connection.close();
    }
}
cs

 

 

 

received : Hello World

 

저번과 같이 Hello World 결과가 잘 출력되는지 확인해보시기 바랍니다.

 

 

 

 

 

 

728x90
반응형