zoukankan      html  css  js  c++  java
  • Android MQTT的发布与订阅

    一、MQTT介绍

    链接1(菜鸟教程):https://www.runoob.com/w3cnote/mqtt-intro.html

    连接2(MQTT中文网):http://mqtt.p2hp.com/

    连接3(Android开发之Mqtt的使用):https://blog.csdn.net/asjqkkkk/article/details/80714234

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)。一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议。构建于TCP/IP协议上,由IBM在1999年发布。

    二、程序示例

      1 public class MqttManager {
      2 
      3     private static boolean initFirst = true;//是否第一次初始化mqtt标识符
      4     private static String host = "tcp://47.106.172.221:8081";
      5     private static String userName; //mqtt用户名
      6     private static String passWord; //mqtt登陆密码
      7     private static MqttManager manager;
      8     private static MqttClient mqttClient;
      9     private static MqttConnectOptions options;
     10     private static String topic;//订阅的主题
     11     private static String clientId; //客户端id
     12 
     13     private static Handler handler = new Handler() {
     14         @Override
     15         public void handleMessage(Message msg) {
     16             super.handleMessage(msg);
     17             if (msg.what == 1) {
     18                 KLog.d(msg.obj);
     19                 EventBus.getDefault()
     20                         .post(new MessageEventBean(AppConstants.MQTT_EVENT_TYPE, (String) msg.obj));
     21             } else if (msg.what == 2) {
     22                 KLog.d("连接成功");
     23                 try {
     24                     KLog.d("订阅的主题:" + topic);
     25                     mqttClient.subscribe(topic, 0);
     26 
     27                 } catch (Exception e) {
     28                     e.printStackTrace();
     29                 }
     30             } else if (msg.what == 3) {
     31                 KLog.d("连接失败,系统正在重连");
     32             }
     33         }
     34     };
     35 
     36     private MqttManager() {
     37 
     38     }
     39     private static MqttCallback myMqttCallback = new MqttCallback(){
     40 
     41         @Override
     42         public void messageArrived(String topic, MqttMessage message){
     43             //subscribe后得到的消息会执行到这里面
     44             KLog.d("messageArrived topic:"+topic);
     45             Message msg = new Message();
     46             msg.what = 1;
     47             msg.obj = message.toString();
     48             handler.sendMessage(msg);
     49         }
     50         @Override
     51         public void connectionLost(Throwable cause) {
     52             KLog.d("connectionLost cause = "+cause);
     53             //连接丢失后,一般在这里面进行重连
     54                 try{
     55                     KLog.d("mqtt重连");
     56                     manager.startReconnect();
     57                 }catch (Exception e){
     58                     KLog.d("Exception = "+ e);
     59                     e.printStackTrace();
     60                 }
     61         }
     62 
     63         @Override
     64         public void deliveryComplete(IMqttDeliveryToken token) {
     65             //publish后会执行到这里
     66             KLog.d("deliveryComplete");
     67         }
     68     };
     69     private ScheduledExecutorService scheduler;
     70 
     71     public static MqttManager getInstance() {
     72         if (manager == null) {
     73             manager = new MqttManager();
     74         }
     75         return manager;
     76     }
     77 
     78     public void initConnection() {
     79         if (initFirst){
     80             KLog.d("第一次调用initConnection");
     81             try {
     82                 clientId = Preferences.getUserAccount() + System.currentTimeMillis();//客户端标识符(本机mac地址+当前时间ms)
     83                 userName = Preferences.getUserAccount();//用户名
     84                 passWord = Preferences.getUserToken();//密码
     85                 topic = userName;
     86                 //host为主机名;clientid即连接MQTT的客户端ID,是客户端的唯一标识符;MemoryPersistence设置clientid的保存形式,默认为以内存保存
     87                 mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
     88                 //MQTT的连接设置
     89                 options = new MqttConnectOptions();
     90                 //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
     91                 options.setCleanSession(true);
     92                 //断开后,是否自动连接
     93                 options.setAutomaticReconnect(true);
     94                 //设置连接的用户名
     95                 options.setUserName(userName);
     96                 //设置连接的密码
     97                 options.setPassword(passWord.toCharArray());
     98                 // 设置超时时间 单位为秒
     99                 options.setConnectionTimeout(10);
    100                 // 设置会话心跳时间 单位为秒 服务器会每隔1.5*(20)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
    101                 options.setKeepAliveInterval(20);
    102                 //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
    103 //            options.setWill(topic,"close".getBytes(),2,true);
    104                 //设置回调
    105                 mqttClient.setCallback(myMqttCallback);
    106                 KLog.d("clientId: "+clientId +", userName: "+userName+", passWord: "+passWord+", topic: "+topic);
    107                 //设置标识符状态
    108                 initFirst = false;
    109                 //mqtt第一次连接
    110                 manager.startReconnect();
    111             } catch (Exception e) {
    112                 KLog.d("initConnection Exception: " + e);
    113                 e.printStackTrace();
    114             }
    115         }else {
    116             KLog.d("网络重连后调用initConnection");
    117             manager.startReconnect();
    118         }
    119     }
    120 
    121     public  void startReconnect() {
    122 
    123         if (NetworkUtils.isConnected()){
    124 
    125             if (!mqttClient.isConnected()) {
    126                 //重新连接
    127                 connect();
    128                 KLog.d("mqtt连接结束");
    129             }else {
    130                 KLog.d("mqttClient.isConnected");
    131             }
    132 //            scheduler = Executors.newSingleThreadScheduledExecutor();
    133 //            scheduler.scheduleAtFixedRate(new Runnable() {
    134 //                @Override
    135 //                public void run() {
    136 //
    137 //                    if (!mqttClient.isConnected()) {
    138 //                        connect();
    139 //                        KLog.d("mqtt连接结束");
    140 //                    }
    141 //                }
    142 //            }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
    143         }else {
    144             KLog.d("网络不可用");
    145 //            scheduler.shutdown();
    146         }
    147 
    148     }
    149 
    150     public void sendMsg(String msg) {
    151         KLog.d("sendMsg");
    152         if (mqttClient != null && mqttClient.isConnected()) {
    153             try {
    154                 KLog.d("发送的主题:" + Preferences.getUserAccount());
    155                 String topic = Preferences.getUserAccount();
    156                 KLog.d(topic);
    157                 byte[] msgBytes = msg.getBytes();
    158                 KLog.d("0000");
    159                 mqttClient.publish(topic, msgBytes, 0, false);
    160                 KLog.d("11111111111111111");
    161             } catch (MqttException e) {
    162                 KLog.d(e);
    163             }
    164         }
    165     }
    166 
    167     //发布的主题设为pubTopic = "owh" + Preferences.getUserAccount();
    168     //发布主题(发布主题和订阅主题应设为不同值)
    169     public void publish(String topicName, String payload) {
    170         if (mqttClient != null && mqttClient.isConnected()) {
    171             // 创建和配置一个消息
    172             MqttMessage message = new MqttMessage(payload.getBytes());
    173             message.setPayload(payload.getBytes());
    174             message.setQos(0);
    175             try {
    176                 KLog.d("1111");
    177                 mqttClient.publish(topicName, message);
    178                 KLog.d("2222");
    179             } catch (MqttException e) {
    180                 KLog.d("publish : " + e.toString());
    181             }
    182         }
    183     }
    184 
    185     private void connect() {
    186 
    187         ThreadPoolManager.getInstance().execute(new Runnable() {
    188             @Override
    189             public void run() {
    190                 try {
    191                     mqttClient.connect(options);
    192                     Message msg = Message.obtain();
    193                     msg.what = 2;
    194                     handler.sendMessage(msg);
    195                 } catch (Exception e) {
    196                     e.printStackTrace();
    197                     Message msg = Message.obtain();
    198                     msg.what = 3;
    199                     handler.sendMessage(msg);
    200                 }
    201             }
    202         });
    203 
    204 //        new Thread(new Runnable() {
    205 //
    206 //            @Override
    207 //            public void run() {
    208 //                try {
    209 //                    mqttClient.connect(options);
    210 //                    Message msg = new Message();
    211 //                    msg.what = 2;
    212 //                    handler.sendMessage(msg);
    213 //                } catch (Exception e) {
    214 //                    e.printStackTrace();
    215 //                    Message msg = new Message();
    216 //                    msg.what = 3;
    217 //                    handler.sendMessage(msg);
    218 //                }
    219 //            }
    220 //        }).start();
    221     }
    222 
    223     //断开连接
    224     public static void mqttDisconnect(){
    225         if(mqttClient !=null && mqttClient.isConnected()){
    226             try{
    227                 mqttClient.disconnect();
    228             }catch (MqttException e){
    229                 KLog.d("mqtt disconnect error");
    230                 e.printStackTrace();
    231             }
    232         }
    233     }
    234 
    235 }

    三、注意事项

    1、MQTT的客户端id(clientId)须唯一。在此项目中clientId = 本机mac地址 + 当前时间(ms)。

    2、一个客户端的一个MQTT连接最好只new一个对象,避免一台设备产生多个客户端账号。

    当多个发布(/订阅)的clientId相同时,会发生Mqtt反复重连的现象,无法正常发送或接收消息。

    当多个发布(/订阅)的clientId不同时,会造成一台设备多个Mqtt账号同时在线,占用了多余的服务器资源。

    3、一个客户端的发布Topic和订阅Topic不应相同。

  • 相关阅读:
    git
    Django RestFramework
    vuex以及axios
    npm 、webpack 、 vue-cli
    vue的生命周期
    vue-router
    vue框架 (小清单)
    nodejs review-01
    npm-bluebird使用
    js整理4
  • 原文地址:https://www.cnblogs.com/ken9527just/p/11468608.html
Copyright © 2011-2022 走看看