반응형
저번에 이어서 AMQP 통신 테스트를 해보도록 하겠습니다.
RabbitMQ 설치하는 방법은 아래 링크를 확인하시기 바랍니다.
https://itstudy402.tistory.com/55
AMQP의 통신 방식을 간단하게 요약해보면
Publisher는 Exchange에게 메시지를 보내고,
Exchange는 Queue에 해당 메시지를 분배하고,
Queue의 메시지를 Consumer가 읽는 방식입니다.
Exchange 종류는 여러 개가 있는데 이번에는 Direct Type으로 테스트해보도록 하겠습니다.
Direct Exchange는 Routing key로 매핑하여 통신하는 방식입니다.
메시지의 Routing key와 Queue에 Binding 할 때 Routing key가 완벽하게 일치해야 통신이 가능합니다.
// 예제 속 Consumer
channel.queueBind(exchangeName, exchangeName, routingKey);
// 예제 속 Producer
channel.basicPublish(exchangeName, routingKey, null, body);
위 코드에서 Consumer, Producer 각각의 routingkey가 완전히 일치해야 합니다.
예제를 확인해보도록 하겠습니다.
1. 라이브러리
amqp-client-5.15.0.jar
2. Consumer 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.BuiltinExchangeType;
public class AMQPConsumer{
String host = "host";
int port = 5672;
String username = "username";
String password = "password";
String exchangeName = "amq.direct";
String queueName = "queuename";
String routingKey = "routingkey";
public void directConsumer () throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Exchange 선언
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
// Queue 선언 및 Binding
channel.queueDeclare(exchangeName, false, false, false, null);
channel.queueBind(exchangeName, exchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
String message = new String(body, "UTF-8");
System.out.println("received : " + message);
}
};
channel.basicConsume(exchangeName, true, consumer);
}
}
|
cs |
3. Producer 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
public class AMQPProducer{
String host = "host";
int port = 5672;
String username = "username";
String password = "password";
String exchangeName = "amq.direct";
String routingKey = "routingkey";
public void directProducer(String message) throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String message = "Hello World";
byte[] body = message.getBytes();
// Publish
channel.basicPublish(exchangeName, routingKey, null, body);
channel.close();
connection.close();
}
}
|
cs |
received : Hello World
위 결과가 잘 출력되는지 확인해보시기 바랍니다.
728x90
반응형
'IT, 개발 > JAVA' 카테고리의 다른 글
JAVA 타임서버 시간 가져오기(time.bora.net)(NTPUDPClient, TimeInfo) (0) | 2022.09.05 |
---|---|
AMQP RabbitMQ Topic Exchange 통신 예제 (JAVA 예제) (0) | 2022.08.30 |
JAVA - Json을 Map으로 / Map을 Json으로 형변환(Json to Map, Map to Json) (0) | 2022.08.19 |
JAVA - while / do-while 사용법, 차이점 (0) | 2022.08.17 |
log4jdbc 로그 출력하기 (MySQL, Spring Framework, log4j) (0) | 2022.08.02 |