zoukankan      html  css  js  c++  java
  • SpringBoot整合MQTT (使用官方demo)

    依赖

            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.3</version>
            </dependency>

    配置

    spring:
      mqtt:
        clientId: test1
        url: tcp://192.168.1.24:1883
        username: admin
        password: 123456

    配置类

    MyMqttClient.java

    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    
    
    @Component
    public class MyMqttClient {
    
        public static MqttClient mqttClient = null;
        private static MemoryPersistence memoryPersistence = null;
        private static MqttConnectOptions mqttConnectOptions = null;
    
        @Autowired
        private MqttRecieveCallback mqttRecieveCallback;
    
        @Autowired
        private MqttTwoRecieveCallback mqttTwoRecieveCallback;
    
    
        @Value("${spring.mqtt.url}")
        private String serverURI;
    
        @Value("${spring.mqtt.clientId}")
        private String clientId;
    
        @Value("${spring.mqtt.username}")
        private String username;
    
    
        @Value("${spring.mqtt.password}")
        private String password;
    
        @PostConstruct
        public  void init() {
            //初始化连接设置对象
            mqttConnectOptions = new MqttConnectOptions();
            //初始化MqttClient
            if (null != mqttConnectOptions) {
    //            true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
                mqttConnectOptions.setCleanSession(true);
    //            设置连接超时
                mqttConnectOptions.setConnectionTimeout(10);
    
                //设置账号密码
            //    mqttConnectOptions.setUserName(username);
            //    mqttConnectOptions.setPassword(password.toCharArray());
    
    //            设置持久化方式
                memoryPersistence = new MemoryPersistence();
                if (null != memoryPersistence && null != clientId) {
                    try {
                        mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                } else {
    
                }
            } else {
                System.out.println("mqttConnectOptions对象为空");
            }
    
            System.out.println(mqttClient.isConnected());
            //设置连接和回调
            if (null != mqttClient) {
                if (!mqttClient.isConnected()) {
    
    //            创建回调函数对象
                    //  MqttRecieveCallback mqttReceriveCallback = new MqttRecieveCallback();
    //            客户端添加回调函数
                    // mqttClient.setCallback(mqttReceriveCallback);
    //            创建连接
                    try {
                        System.out.println("创建连接");
                        mqttClient.connect(mqttConnectOptions);
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
    
                }
            } else {
                System.out.println("mqttClient为空");
            }
    
            System.out.println(mqttClient.isConnected());
    
            if (mqttClient.isConnected()) {
                try {
                    //添加回调方法1
                    mqttClient.subscribe("topic/test1", 2, mqttRecieveCallback);
                    //添加回调方法2
                    mqttClient.subscribe("topic/test2", 2, mqttTwoRecieveCallback);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
    
        //    关闭连接
        @PreDestroy
        public void closeConnect() {
            //关闭存储方式
            if (null != memoryPersistence) {
                try {
                    memoryPersistence.close();
                } catch (MqttPersistenceException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                System.out.println("memoryPersistence is null");
            }
    
    //        关闭连接
            if (null != mqttClient) {
                if (mqttClient.isConnected()) {
                    try {
                        mqttClient.disconnect();
                        mqttClient.close();
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("mqttClient is not connect");
                }
            } else {
                System.out.println("mqttClient is null");
            }
        }
    
        //    发布消息
        public void publishMessage(String pubTopic, String message, int qos,Boolean retained) {
            if (null != mqttClient && mqttClient.isConnected()) {
                System.out.println("发布消息   " + mqttClient.isConnected());
                System.out.println("id:" + mqttClient.getClientId());
                MqttMessage mqttMessage = new MqttMessage();
                mqttMessage.setQos(qos);
                mqttMessage.setPayload(message.getBytes());
                mqttMessage.setRetained(retained);
    
                MqttTopic topic = mqttClient.getTopic(pubTopic);
    
                if (null != topic) {
                    try {
                        MqttDeliveryToken publish = topic.publish(mqttMessage);
                        if (!publish.isComplete()) {
                            System.out.println("消息发布成功");
                        }
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
    
            } else {
                reConnect();
            }
    
        }
    
        //    重新连接
        public void reConnect() {
            if (null != mqttClient) {
                if (!mqttClient.isConnected()) {
                    if (null != mqttConnectOptions) {
                        try {
                            mqttClient.connect(mqttConnectOptions);
                        } catch (MqttException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    } else {
                        System.out.println("mqttConnectOptions is null");
                    }
                } else {
                    System.out.println("mqttClient is null or connect");
                }
            } else {
                init();
            }
    
        }
    
        //    订阅主题
        public void subTopic(String topic) {
            if (null != mqttClient && mqttClient.isConnected()) {
                try {
                    mqttClient.subscribe(topic, 1);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                System.out.println("mqttClient is error");
            }
        }
    
    
        //    清空主题
        public void cleanTopic(String topic) {
            if (null != mqttClient && !mqttClient.isConnected()) {
                try {
                    mqttClient.unsubscribe(topic);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                System.out.println("mqttClient is error");
            }
        }
    
    }

    回调类一

    MqttRecieveCallback.java

    import org.eclipse.paho.client.mqttv3.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class MqttRecieveCallback implements MqttCallback, IMqttMessageListener {
    
    
    
    
        @Autowired
        private MyMqttClient client;
    
        @Override
        public void connectionLost(Throwable cause) {
    
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message)  {
            System.out.println("Client 接收消息主题 : " + topic);
            System.out.println("Client 接收消息Qos : " + message.getQos());
            System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
    
            /**
             * 发送消息
             */
            client.publishMessage("topic/test2","2",2,false);
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
    
        }
    
    }

    回调类2

    MqttTwoRecieveCallback.java

    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class MqttTwoRecieveCallback implements MqttCallback, IMqttMessageListener {
    
        @Override
        public void connectionLost(Throwable cause) {
    
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            System.out.println("Client2 接收消息主题 : " + topic);
            System.out.println("Client2 接收消息Qos : " + message.getQos());
            System.out.println("Client2 接收消息内容 : " + new String(message.getPayload()));
    
    
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
    
        }
    
    }
    -----------------------有任何问题可以在评论区评论,也可以私信我,我看到的话会进行回复,欢迎大家指教------------------------ (蓝奏云官网有些地址失效了,需要把请求地址lanzous改成lanzoux才可以)
  • 相关阅读:
    WslRegisterDistribution failed with error: 0x80370102
    vscode C/C++ 语法检查
    ADO.NET 一(概述)
    线程三(Mutex)
    线程二(Monitor)
    线程一(lock)
    interface Part4(接口中的多态)
    interface Part3(实现:显示和隐式)
    interface Part2(定义接口)
    interface Part1(接口详解)
  • 原文地址:https://www.cnblogs.com/pxblog/p/15458521.html
Copyright © 2011-2022 走看看