本文是使用Java语言,eclipse paho的实现方式,去调用MQTT服务器端,编写的MqttClient代码中针对MQTT服务器重启定制重连机制所遇到的问题进行汇总。
1.1编写MqttConnection类,创建MQTT连接
1 public synchronized boolean connect() { 2 try { 3 if(null == client) { 4 //host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示, 5 // MemoryPersistence设置clientid的保存形式,默认为以内存保存 6 client = new MqttClient(host, client_id, new MemoryPersistence()); 7 //设置回调 8 client.setCallback(new PushCallBack(MqttConnection.this, devDao)); 9 } 10 //获取连接配置 11 getOption(); 12 client.connect(option); 13 log.info("[MQTT] connect to Mqtt Server success..."); 14 return isConnected(); 15 } catch (Exception e) { 16 e.printStackTrace(); 17 return false; 18 } 19 }
1 private void getOption() { 2 //MQTT连接设置 3 option = new MqttConnectOptions(); 4 //设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接 5 option.setCleanSession(true); 6 //设置连接的用户名 7 option.setUserName(userName); 8 //设置连接的密码 9 option.setPassword(password.toCharArray()); 10 //设置超时时间 单位为秒 11 option.setConnectionTimeout(outTime); 12 //设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 13 option.setKeepAliveInterval(keepTime); 14 option.setAutomaticReconnect(true); 15 //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 16 // option.setWill(topic, "close".getBytes(), 2, true); 17 }
1.2编写PushCallBack回调类,实现重连
1 public void connectionLost(Throwable throwable) { 2 // 连接丢失后,一般在这里面进行重连 3 log.info("[MQTT] 连接断开,30S之后尝试重连..."); 4 while(true) { 5 try { 6 Thread.sleep(30000); 7 mqttConn.reConnect(); 8 break; 9 } catch (Exception e) { 10 e.printStackTrace(); 11 continue; 12 } 13 } 14 }
异常出现:
若在connectionLost()方法中直接循环调用MqttConnection类中connect()方法,
去实现重连机制的话,会出现在第一次重连成功后,一直断开连接再重连再断开连接再重连的死循环中。
异常定位:
原因在connect()方法中的这句:
重新new上一个client_id相同的MqttClient,client_id是MQTT client的唯一标识,client_id不能重复。
这样就会出现重连时创建的MqttClient,使程序中初始化时创建的MqttClient断开连接,断开连接后就会回滚到connectionLost方法中,
然后此方法中又会继续重连,就出现上述的断开连接再重连再断开连接再重连的死循环。
异常解决:
在MqttConnection连接类中定义一个重连的方法:
1 //断线重连 2 public void reConnect() throws Exception { 3 if(null != client) { 4 client.connect(option); 5 } 6 }
不需要重新new一个MqttClient,只需要调用connect方法就OK了,eclipse paho就会把之前的连接重新创建起来。
另外分享setRetained()方法:
1 /** 2 * 发布消息 3 * @param topic 消息主题 4 * @param qos 消息传输质量 5 * @param message 消息内容 6 */ 7 public void publish(String topic, int qos, String message) throws Exception { 8 MqttTopic mqttTopic = client.getTopic(topic); 9 MqttMessage mqttMessage = new MqttMessage(); 10 mqttMessage.setQos(qos); 11 //是否设置保留消息,若为true,后来的订阅者订阅该主题时仍可接收到该消息 12 mqttMessage.setRetained(false); 13 mqttMessage.setPayload(message.getBytes()); 14 MqttDeliveryToken token = mqttTopic.publish(mqttMessage); 15 token.waitForCompletion(); 16 log.info("[MQTT] publish message : " + token.isComplete() + 17 ",{topic : " + topic + ", message : " + message + "}"); 18 }
setRetained():消息保留机制,若设置为true,mqtt服务器会保留每次发布的消息,
若订阅某主题的客户端重启,则会把此主题之前发布的消息重新推送到客户端。