반응형
저번에 이어서 AMQP 통신 테스트를 해보도록 하겠습니다.
이번에는 Topic Type Exchange 통신을 테스트해보도록 하겠습니다.
Topic Exchange는 특정 Routing 패턴이 일치하는 Queue로 라우팅 하는 방식입니다.
Routing 패턴은 마침표로 구분되는 단어들의 리스트를 말합니다.
* : 1개의 단어와 치환
# : 0개나 여러 개 단어와 치환 가능
예) testKey.* / testKey.*.* / *.testKey / testKey.#
예를 들어 testKey.*.* 라는 패턴으로 Binding 했다면
testKey.abc.abc 이런 방식으로 패턴을 일치시켜야 메시지를 받아갈 수 있습니다.
testKey.# 라면 testKey / testKey.abc / testKey.abc.abc 이렇게 설정해도 치환이 가능하다는 것입니다.
반응형
1. 라이브러리
amqp-client-5.15.0.jar
slf4j-api-2.0.0.jar
2. Consumer 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.BuiltinExchangeType;
public class AMQPConsumer{
String host = "host";
int port = 5672;
String username = "username";
String password = "password";
String exchangeName = "amq.topic";
String queueName = "queuename";
String routingPattern = "testKey.*.*";
public void topicConsumer () throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Exchange 선언
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true);
// Queue 선언 및 Binding
channel.queueDeclare(exchangeName, false, false, false, null);
channel.queueBind(exchangeName, exchangeName, routingPattern);
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
String message = new String(body, "UTF-8");
System.out.println("received : " + message);
}
};
channel.basicConsume(exchangeName, true, consumer);
}
}
|
cs |
3. Producer 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
public class AMQPProducer{
String host = "host";
int port = 5672;
String username = "username";
String password = "password";
String exchangeName = "amq.topic";
String routingPattern = "testKey.abc.xyz";
public void topicProducer(String message) throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String message = "Hello World";
byte[] body = message.getBytes();
// Publish
channel.basicPublish(exchangeName, routingPattern, null, body);
channel.close();
connection.close();
}
}
|
cs |
received : Hello World
저번과 같이 Hello World 결과가 잘 출력되는지 확인해보시기 바랍니다.
728x90
반응형
'IT, 개발 > JAVA' 카테고리의 다른 글
RabbitMQ MQTT - JAVA publish / subscribe 에제 (eclipse.paho) (0) | 2022.09.09 |
---|---|
JAVA 타임서버 시간 가져오기(time.bora.net)(NTPUDPClient, TimeInfo) (0) | 2022.09.05 |
AMQP RabbitMQ Direct Exchange 통신 예제 (JAVA 예제) (0) | 2022.08.29 |
JAVA - Json을 Map으로 / Map을 Json으로 형변환(Json to Map, Map to Json) (0) | 2022.08.19 |
JAVA - while / do-while 사용법, 차이점 (0) | 2022.08.17 |