본문 바로가기
IT, 개발/JAVA

RabbitMQ MQTT - JAVA publish / subscribe 에제 (eclipse.paho)

by 개발자스터디 2022. 9. 9.
반응형

 

 

 

 

 

이전 시간에 RabbitMQ를 사용해서 MQTT 통신을 하기 위한 설정에 대해 알아봤습니다.

 

자세한 내용은 링크를 통해 확인할 수 있습니다.
https://itstudy402.tistory.com/60

 

RabbitMQ를 사용한 MQTT 통신 구성하기(Ubuntu)

저번 시간에 AMQP 통신을 위해 RabbitMQ를 설치하였습니다. 이번에는 MQTT 통신을 위해 플러그인을 추가해서 통신하는 테스트를 해보도록 하겠습니다. RabbitMQ 설치는 이전 글을 확인하시기

itstudy402.tistory.com

 

이제 JAVA에서 Publish, Subscribe를 테스트해서 통신이 잘 되는지 확인해보려고 합니다.

MQTT 통신 예제를 보며 테스트해보도록 하겠습니다.

 

각자의 server URL, userId, password를 입력하고 동일한 topic명을 사용하여 테스트해보시기 바랍니다.

 

 

1. 라이브러리

 

1-1. maven

<!-- https://mvnrepository.com/artifact/org.eclipse.paho/mqtt-client -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>mqtt-client</artifactId>
    <version>0.4.0</version>
</dependency>

 

 

1-2. jar

 

mqtt-client-0.4.0.jar 

 

 

2. Publish 예제

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
public class MqttPubTest {
 
    public static void main(String[] args) {
 
        String serverUrl    = "tcp://123.456.789:1883";   // your IP
        String clientId     = "client_pub";
        String userId       = "username";
        char[] password     = "password".toCharArray();
        String topic        = "testTopic";
        int qos             = 1;
        
        try 
        {
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            connOpts.setUserName(userId);
            connOpts.setPassword(password);
            
            String content = "pub test msg";
            MqttClient client = new MqttClient(serverUrl, clientId, new MemoryPersistence());
            
            // connect
            client.connect(connOpts);
            
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            
            // publish
            client.publish(topic, message);
 
            
            System.out.println("Publishing message: "+content);
            
            // disconnect
            client.disconnect();
        } 
        catch(MqttException me) 
        {
            me.printStackTrace();
        }
    }
}
cs

 

 

 

반응형

 

3. Subscribe 예제 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
public class MqttSubTest implements MqttCallback {
    
    String serverUrl    = "tcp://123.456.789:1883";   // your IP
    String clientId     = "client_sub";
    String userId       = "username";
    char[] password     = "password".toCharArray();
    String topic        = "testTopic";
    int qos             = 1;
    
    MqttClient client;
    
    public MqttSubTest() 
    {
        try
        {
            MqttConnectOptions conOpts = new MqttConnectOptions();
            conOpts.setCleanSession(true);
            conOpts.setUserName(userId);
            conOpts.setPassword(password);
    
            client = new MqttClient(serverUrl, clientId, new MemoryPersistence());
            client.setCallback(this);
            
            // connect
            client.connect(conOpts);
            
            // subscribe
            client.subscribe(topic, qos);
            
            System.out.println("connected..");
        }
        catch(MqttException me) 
        {
            me.printStackTrace();
        }
    }
    
 
    @Override
    public void connectionLost(Throwable arg0) {
        // TODO Auto-generated method stub
        System.out.println(arg0);
    }
 
    @Override
    public void deliveryComplete(IMqttDeliveryToken arg0) {
        // TODO Auto-generated method stub
    }
 
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("topic   : " + topic);
        System.out.println("message : " + message.getPayload());
    }
    
    
    
    
    public static void main(String[] args) {
        
        new MqttSubTest();
        
    }
}
cs

 

 

 

 

 

728x90
반응형