zoukankan      html  css  js  c++  java
  • Mqtt(paho)重连机制

    本文是使用Java语言,eclipse paho的实现方式,去调用MQTT服务器端,编写的MqttClient代码中针对MQTT服务器重启定制重连机制所遇到的问题进行汇总。

    1.1编写MqttConnection类,创建MQTT连接

     1 public synchronized boolean connect() {
     2         try {
     3             if(null == client) {
     4                 //host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,
     5                 // MemoryPersistence设置clientid的保存形式,默认为以内存保存
     6                 client = new MqttClient(host, client_id, new MemoryPersistence());
     7                 //设置回调
     8                 client.setCallback(new PushCallBack(MqttConnection.this, devDao));
     9             }
    10             //获取连接配置
    11             getOption();
    12             client.connect(option);
    13             log.info("[MQTT] connect to Mqtt Server success...");
    14             return isConnected();
    15         } catch (Exception e) {
    16             e.printStackTrace();
    17             return false;
    18         }
    19     }
     1 private void getOption() {
     2         //MQTT连接设置
     3         option = new MqttConnectOptions();
     4         //设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
     5         option.setCleanSession(true);
     6         //设置连接的用户名
     7         option.setUserName(userName);
     8         //设置连接的密码
     9         option.setPassword(password.toCharArray());
    10         //设置超时时间 单位为秒
    11         option.setConnectionTimeout(outTime);
    12         //设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
    13         option.setKeepAliveInterval(keepTime);
    14         option.setAutomaticReconnect(true);
    15         //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
    16 //            option.setWill(topic, "close".getBytes(), 2, true);
    17     }

    1.2编写PushCallBack回调类,实现重连

     1 public void connectionLost(Throwable throwable) {
     2         // 连接丢失后,一般在这里面进行重连
     3         log.info("[MQTT] 连接断开,30S之后尝试重连...");
     4         while(true) {
     5             try {
     6                 Thread.sleep(30000);
     7                 mqttConn.reConnect();
     8                 break;
     9             } catch (Exception e) {
    10                 e.printStackTrace();
    11                 continue;
    12             }
    13         }
    14     }

    异常出现:

    若在connectionLost()方法中直接循环调用MqttConnection类中connect()方法,

    去实现重连机制的话,会出现在第一次重连成功后,一直断开连接再重连再断开连接再重连的死循环中。

    异常定位:

    原因在connect()方法中的这句:

     

    重新new上一个client_id相同的MqttClient,client_id是MQTT client的唯一标识,client_id不能重复。

    这样就会出现重连时创建的MqttClient,使程序中初始化时创建的MqttClient断开连接,断开连接后就会回滚到connectionLost方法中,

    然后此方法中又会继续重连,就出现上述的断开连接再重连再断开连接再重连的死循环。

    异常解决:

    在MqttConnection连接类中定义一个重连的方法:

    1 //断线重连
    2     public void reConnect() throws Exception {
    3         if(null != client) {
    4             client.connect(option);
    5         }
    6     }

    不需要重新new一个MqttClient,只需要调用connect方法就OK了,eclipse paho就会把之前的连接重新创建起来。

    另外分享setRetained()方法:

     1 /**
     2      * 发布消息
     3      * @param topic     消息主题
     4      * @param qos       消息传输质量
     5      * @param message   消息内容
     6      */
     7     public void publish(String topic, int qos, String message) throws Exception {
     8         MqttTopic mqttTopic = client.getTopic(topic);
     9         MqttMessage mqttMessage = new MqttMessage();
    10         mqttMessage.setQos(qos);
    11         //是否设置保留消息,若为true,后来的订阅者订阅该主题时仍可接收到该消息
    12         mqttMessage.setRetained(false);
    13         mqttMessage.setPayload(message.getBytes());
    14         MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
    15         token.waitForCompletion();
    16         log.info("[MQTT] publish message : " + token.isComplete() +
    17                 ",{topic : " + topic + ", message : " + message + "}");
    18     }

    setRetained():消息保留机制,若设置为true,mqtt服务器会保留每次发布的消息,

    若订阅某主题的客户端重启,则会把此主题之前发布的消息重新推送到客户端。

  • 相关阅读:
    线程学习笔记
    mac系统在配置navicat时连接数据的时候提示can't connect to mysql server on '127.0.0.1'
    启动apache 提示Starting httpd: AH00558
    使用block的时候,导致的内存泄漏
    如何调整cell的大小
    代码控制打电话、发短信、发邮件、打开手机app等操作
    汇编学习--第九天
    汇编学习--第八天
    原码一位乘法与补码一位乘法
    定点整数的加减法
  • 原文地址:https://www.cnblogs.com/zl-wjzf/p/9646768.html
Copyright © 2011-2022 走看看