一、MQTT介绍
链接1(菜鸟教程):https://www.runoob.com/w3cnote/mqtt-intro.html
连接2(MQTT中文网):http://mqtt.p2hp.com/
连接3(Android开发之Mqtt的使用):https://blog.csdn.net/asjqkkkk/article/details/80714234
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)。一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议。构建于TCP/IP协议上,由IBM在1999年发布。
二、程序示例
1 public class MqttManager { 2 3 private static boolean initFirst = true;//是否第一次初始化mqtt标识符 4 private static String host = "tcp://47.106.172.221:8081"; 5 private static String userName; //mqtt用户名 6 private static String passWord; //mqtt登陆密码 7 private static MqttManager manager; 8 private static MqttClient mqttClient; 9 private static MqttConnectOptions options; 10 private static String topic;//订阅的主题 11 private static String clientId; //客户端id 12 13 private static Handler handler = new Handler() { 14 @Override 15 public void handleMessage(Message msg) { 16 super.handleMessage(msg); 17 if (msg.what == 1) { 18 KLog.d(msg.obj); 19 EventBus.getDefault() 20 .post(new MessageEventBean(AppConstants.MQTT_EVENT_TYPE, (String) msg.obj)); 21 } else if (msg.what == 2) { 22 KLog.d("连接成功"); 23 try { 24 KLog.d("订阅的主题:" + topic); 25 mqttClient.subscribe(topic, 0); 26 27 } catch (Exception e) { 28 e.printStackTrace(); 29 } 30 } else if (msg.what == 3) { 31 KLog.d("连接失败,系统正在重连"); 32 } 33 } 34 }; 35 36 private MqttManager() { 37 38 } 39 private static MqttCallback myMqttCallback = new MqttCallback(){ 40 41 @Override 42 public void messageArrived(String topic, MqttMessage message){ 43 //subscribe后得到的消息会执行到这里面 44 KLog.d("messageArrived topic:"+topic); 45 Message msg = new Message(); 46 msg.what = 1; 47 msg.obj = message.toString(); 48 handler.sendMessage(msg); 49 } 50 @Override 51 public void connectionLost(Throwable cause) { 52 KLog.d("connectionLost cause = "+cause); 53 //连接丢失后,一般在这里面进行重连 54 try{ 55 KLog.d("mqtt重连"); 56 manager.startReconnect(); 57 }catch (Exception e){ 58 KLog.d("Exception = "+ e); 59 e.printStackTrace(); 60 } 61 } 62 63 @Override 64 public void deliveryComplete(IMqttDeliveryToken token) { 65 //publish后会执行到这里 66 KLog.d("deliveryComplete"); 67 } 68 }; 69 private ScheduledExecutorService scheduler; 70 71 public static MqttManager getInstance() { 72 if (manager == null) { 73 manager = new MqttManager(); 74 } 75 return manager; 76 } 77 78 public void initConnection() { 79 if (initFirst){ 80 KLog.d("第一次调用initConnection"); 81 try { 82 clientId = Preferences.getUserAccount() + System.currentTimeMillis();//客户端标识符(本机mac地址+当前时间ms) 83 userName = Preferences.getUserAccount();//用户名 84 passWord = Preferences.getUserToken();//密码 85 topic = userName; 86 //host为主机名;clientid即连接MQTT的客户端ID,是客户端的唯一标识符;MemoryPersistence设置clientid的保存形式,默认为以内存保存 87 mqttClient = new MqttClient(host, clientId, new MemoryPersistence()); 88 //MQTT的连接设置 89 options = new MqttConnectOptions(); 90 //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 91 options.setCleanSession(true); 92 //断开后,是否自动连接 93 options.setAutomaticReconnect(true); 94 //设置连接的用户名 95 options.setUserName(userName); 96 //设置连接的密码 97 options.setPassword(passWord.toCharArray()); 98 // 设置超时时间 单位为秒 99 options.setConnectionTimeout(10); 100 // 设置会话心跳时间 单位为秒 服务器会每隔1.5*(20)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 101 options.setKeepAliveInterval(20); 102 //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 103 // options.setWill(topic,"close".getBytes(),2,true); 104 //设置回调 105 mqttClient.setCallback(myMqttCallback); 106 KLog.d("clientId: "+clientId +", userName: "+userName+", passWord: "+passWord+", topic: "+topic); 107 //设置标识符状态 108 initFirst = false; 109 //mqtt第一次连接 110 manager.startReconnect(); 111 } catch (Exception e) { 112 KLog.d("initConnection Exception: " + e); 113 e.printStackTrace(); 114 } 115 }else { 116 KLog.d("网络重连后调用initConnection"); 117 manager.startReconnect(); 118 } 119 } 120 121 public void startReconnect() { 122 123 if (NetworkUtils.isConnected()){ 124 125 if (!mqttClient.isConnected()) { 126 //重新连接 127 connect(); 128 KLog.d("mqtt连接结束"); 129 }else { 130 KLog.d("mqttClient.isConnected"); 131 } 132 // scheduler = Executors.newSingleThreadScheduledExecutor(); 133 // scheduler.scheduleAtFixedRate(new Runnable() { 134 // @Override 135 // public void run() { 136 // 137 // if (!mqttClient.isConnected()) { 138 // connect(); 139 // KLog.d("mqtt连接结束"); 140 // } 141 // } 142 // }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS); 143 }else { 144 KLog.d("网络不可用"); 145 // scheduler.shutdown(); 146 } 147 148 } 149 150 public void sendMsg(String msg) { 151 KLog.d("sendMsg"); 152 if (mqttClient != null && mqttClient.isConnected()) { 153 try { 154 KLog.d("发送的主题:" + Preferences.getUserAccount()); 155 String topic = Preferences.getUserAccount(); 156 KLog.d(topic); 157 byte[] msgBytes = msg.getBytes(); 158 KLog.d("0000"); 159 mqttClient.publish(topic, msgBytes, 0, false); 160 KLog.d("11111111111111111"); 161 } catch (MqttException e) { 162 KLog.d(e); 163 } 164 } 165 } 166 167 //发布的主题设为pubTopic = "owh" + Preferences.getUserAccount(); 168 //发布主题(发布主题和订阅主题应设为不同值) 169 public void publish(String topicName, String payload) { 170 if (mqttClient != null && mqttClient.isConnected()) { 171 // 创建和配置一个消息 172 MqttMessage message = new MqttMessage(payload.getBytes()); 173 message.setPayload(payload.getBytes()); 174 message.setQos(0); 175 try { 176 KLog.d("1111"); 177 mqttClient.publish(topicName, message); 178 KLog.d("2222"); 179 } catch (MqttException e) { 180 KLog.d("publish : " + e.toString()); 181 } 182 } 183 } 184 185 private void connect() { 186 187 ThreadPoolManager.getInstance().execute(new Runnable() { 188 @Override 189 public void run() { 190 try { 191 mqttClient.connect(options); 192 Message msg = Message.obtain(); 193 msg.what = 2; 194 handler.sendMessage(msg); 195 } catch (Exception e) { 196 e.printStackTrace(); 197 Message msg = Message.obtain(); 198 msg.what = 3; 199 handler.sendMessage(msg); 200 } 201 } 202 }); 203 204 // new Thread(new Runnable() { 205 // 206 // @Override 207 // public void run() { 208 // try { 209 // mqttClient.connect(options); 210 // Message msg = new Message(); 211 // msg.what = 2; 212 // handler.sendMessage(msg); 213 // } catch (Exception e) { 214 // e.printStackTrace(); 215 // Message msg = new Message(); 216 // msg.what = 3; 217 // handler.sendMessage(msg); 218 // } 219 // } 220 // }).start(); 221 } 222 223 //断开连接 224 public static void mqttDisconnect(){ 225 if(mqttClient !=null && mqttClient.isConnected()){ 226 try{ 227 mqttClient.disconnect(); 228 }catch (MqttException e){ 229 KLog.d("mqtt disconnect error"); 230 e.printStackTrace(); 231 } 232 } 233 } 234 235 }
三、注意事项
1、MQTT的客户端id(clientId)须唯一。在此项目中clientId = 本机mac地址 + 当前时间(ms)。
2、一个客户端的一个MQTT连接最好只new一个对象,避免一台设备产生多个客户端账号。
当多个发布(/订阅)的clientId相同时,会发生Mqtt反复重连的现象,无法正常发送或接收消息。
当多个发布(/订阅)的clientId不同时,会造成一台设备多个Mqtt账号同时在线,占用了多余的服务器资源。
3、一个客户端的发布Topic和订阅Topic不应相同。