zoukankan      html  css  js  c++  java
  • 采用MQTT协议实现android消息推送(4)选fusesource-mqtt-client为客户端

    1.简介

      一个java写的mqtt客户端。项目地址:

      https://github.com/fusesource/mqtt-client

    2.引入fusesource-mqtt-client库

    • File--->Project Structure--->Dependencies
    • 点绿色
    • 在弹出的窗口中输入“‘mqtt-client”回车搜索
    • 在结果中选择org.fusesource.mqtt-client:mqtt-client:1.xxx

    3.示例代码 

    3.1 参考代码

    activeMQ服务端软件内提供的示例代码 apache-activemq-5.15.0/examples/mqtt/java/
    dzone提供的示例 https://dzone.com/articles/android-mqtt-activemq
    github上的示例代码 https://github.com/fusesource/mqtt-client#using-the-callbackcontinuation-passing-based-api

    3.2 效果

          

    3.3 源码

      1 package com.example.tt.mqtt;
      2 
      3 import android.app.NotificationManager;
      4 import android.app.PendingIntent;
      5 import android.content.Context;
      6 import android.content.Intent;
      7 import android.os.Bundle;
      8 import android.support.v4.app.TaskStackBuilder;
      9 import android.support.v7.app.AppCompatActivity;
     10 import android.support.v7.app.NotificationCompat;
     11 import android.util.Log;
     12 import android.view.View;
     13 import android.widget.Button;
     14 import android.widget.CheckBox;
     15 import android.widget.CompoundButton;
     16 import android.widget.EditText;
     17 import android.widget.TextView;
     18 import android.widget.ToggleButton;
     19 
     20 import org.fusesource.hawtbuf.Buffer;
     21 import org.fusesource.hawtbuf.UTF8Buffer;
     22 import org.fusesource.mqtt.client.BlockingConnection;
     23 import org.fusesource.mqtt.client.Callback;
     24 import org.fusesource.mqtt.client.CallbackConnection;
     25 import org.fusesource.mqtt.client.Listener;
     26 import org.fusesource.mqtt.client.MQTT;
     27 import org.fusesource.mqtt.client.Message;
     28 import org.fusesource.mqtt.client.QoS;
     29 import org.fusesource.mqtt.client.Topic;
     30 
     31 import java.net.URISyntaxException;
     32 
     33 
     34 public class MainActivity extends AppCompatActivity implements View.OnClickListener,CompoundButton.OnCheckedChangeListener {
     35 
     36     final static String TAG = "MQTTClient";
     37 
     38     //UI
     39     ToggleButton    btnConnect;
     40     Button          btnPublish, btnSubscribe;
     41     EditText        edtServer,edtMessage,edtTopic,edtClientID;
     42     TextView        received;
     43     CheckBox        cbxPersist;
     44 
     45     //MQTT
     46     final static String clientId    = "android";
     47     final static short  keepAlive   = 255;
     48     final static String host        = "192.168.1.101";
     49     final static String user        = "guest";
     50     final static int    port        = 1883;
     51     final static String password    = "admin";
     52 
     53     MQTT                mqtt                ;
     54     Listener            listener            ;
     55     CallbackConnection  callbackConnection  ;
     56     Callback<Void>      connectCallback     ;
     57     Callback<byte[]>    subscribeCallback   ;
     58     Callback<Void>      publishCallback     ;
     59     Callback<Void>      disconnectCallback  ;
     60 
     61     {
     62         connectCallback = new Callback<Void>(){
     63 
     64             @Override
     65             public void onSuccess(Void value) {
     66                 Log.d(TAG, "connectCallback : onSuccess");
     67                 received.post(new Runnable() {
     68                     @Override
     69                     public void run() {
     70                         received.setText("connectCallback success");
     71                     }
     72                 });
     73 
     74             }
     75             @Override
     76             public void onFailure(Throwable value) {
     77                 value.printStackTrace();
     78                 Log.d(TAG, "connectCallback : failure");
     79                 received.post(new Runnable() {
     80                     @Override
     81                     public void run() {
     82                         received.setText("connectCallback failure");
     83                     }
     84                 });
     85                 System.exit(-2);
     86             }
     87         };
     88         disconnectCallback = new Callback<Void>(){
     89 
     90             public void onSuccess(Void value) {
     91                 received.post(new Runnable() {
     92                     @Override
     93                     public void run() {
     94                         received.setText("disconnect success");
     95                     }
     96                 });
     97             }
     98             public void onFailure(Throwable e) {
     99                 received.post(new Runnable() {
    100                     @Override
    101                     public void run() {
    102                         received.setText("disconnect failure");
    103                     }
    104                 });
    105             }
    106         };
    107 
    108         listener = new Listener() {
    109 
    110             @Override
    111             public void onConnected() {
    112                 Log.d(TAG, "listener onConnected");
    113                 received.post(new Runnable() {
    114                     @Override
    115                     public void run() {
    116                         received.setText("listener onConnected");
    117                     }
    118                 });
    119             }
    120 
    121             @Override
    122             public void onDisconnected() {
    123                 Log.d(TAG, "listener onDisconnected");
    124                 received.post(new Runnable() {
    125                     @Override
    126                     public void run() {
    127                         received.setText("listener onDisconnected");
    128                     }
    129                 });
    130             }
    131 
    132             @Override
    133             public void onPublish(final UTF8Buffer topic, Buffer msg, Runnable ack) {
    134                 final String body = msg.utf8().toString();
    135                 Log.d(TAG, "onPublish: " + body);
    136                 received.post(new Runnable() {
    137                     @Override
    138                     public void run() {
    139                         makeNotification(topic.toString(),body);
    140                         received.append("
    received : " + body);
    141                     }
    142                 });
    143             }
    144 
    145             @Override
    146             public void onFailure(Throwable value) {
    147                 Log.d(TAG, "listener failure");
    148                 received.post(new Runnable() {
    149                     @Override
    150                     public void run() {
    151                         received.setText("listener failure");
    152                     }
    153                 });
    154             }
    155         };
    156 
    157         subscribeCallback = new Callback<byte[]>() {
    158 
    159             public void onSuccess(byte[] qoses) {
    160                 Log.d(TAG, "subscribe : success");
    161 
    162                 received.post(new Runnable() {
    163                     @Override
    164                     public void run() {
    165                         received.setText("subscribe " + edtTopic.getText().toString() + ": success");
    166                     }
    167                 });
    168             }
    169             public void onFailure(Throwable value) {
    170                 value.printStackTrace();
    171                 Log.d(TAG, "subscribe : failure");
    172                 received.post(new Runnable() {
    173                     @Override
    174                     public void run() {
    175                         received.setText("subscribe " + edtTopic.getText().toString() + ": failure");
    176                     }
    177                 });
    178                 System.exit(-2);
    179             }
    180         };
    181         publishCallback = new Callback<Void>() {
    182             @Override
    183             public void onSuccess(Void value) {
    184                 Log.d(TAG, "onSuccess: ");
    185             }
    186 
    187             @Override
    188             public void onFailure(Throwable value) {
    189                 Log.d(TAG, "onFailure: ");
    190             }
    191         };
    192     }
    193 
    194     void connect(){
    195         callbackConnection.connect(connectCallback);
    196     }
    197 
    198     void disconnect(){
    199         callbackConnection.disconnect(disconnectCallback);
    200     }
    201 
    202     void subscribe(){
    203 
    204         String topicName = edtTopic.getText().toString().trim();
    205 
    206         Topic topics[] = new Topic[]{new Topic(topicName,QoS.AT_LEAST_ONCE)};
    207 
    208         callbackConnection.subscribe(topics,subscribeCallback);
    209 
    210     }
    211 
    212     void publish(){
    213 
    214         String data = edtMessage.getText().toString();
    215 
    216         String topicName = edtTopic.getText().toString().trim();
    217 
    218         callbackConnection.publish(topicName,data.getBytes(),QoS.AT_LEAST_ONCE,false,publishCallback);
    219 
    220     }
    221 
    222     void initMqtt(){
    223 
    224         mqtt = new MQTT();
    225         try {
    226             mqtt.setHost(host, port);
    227             mqtt.setUserName(user);
    228             mqtt.setPassword(password);
    229             mqtt.setKeepAlive(keepAlive);
    230             mqtt.getClientId();
    231             callbackConnection = mqtt.callbackConnection();
    232             callbackConnection.listener(listener);
    233 
    234         } catch (URISyntaxException e) {
    235             e.printStackTrace();
    236             Log.e(TAG,"-=-=-=-=-=-=------------====
     initMqtt exception : " + e.getMessage());
    237         }
    238     }
    239 
    240     @Override
    241     protected void onCreate(Bundle savedInstanceState) {
    242         super.onCreate(savedInstanceState);
    243         setContentView(R.layout.activity_main);
    244 
    245         received    = (TextView)    findViewById(R.id.txt_received);
    246         btnSubscribe= (Button)      findViewById(R.id.btn_subscribe);
    247         btnConnect  = (ToggleButton)findViewById(R.id.btn_connect);
    248         btnPublish  = (Button)      findViewById(R.id.btn_publish);
    249         edtServer   = (EditText)    findViewById(R.id.edt_server);
    250         edtTopic    = (EditText)    findViewById(R.id.edt_topic);
    251         edtMessage  = (EditText)    findViewById(R.id.edt_message);
    252         edtClientID = (EditText)    findViewById(R.id.edt_clientID);
    253         cbxPersist  = (CheckBox)    findViewById(R.id.cbx_persist);
    254 
    255         btnConnect  .setOnClickListener(this);
    256         btnConnect  .setOnCheckedChangeListener(this);
    257         cbxPersist  .setOnCheckedChangeListener(this);
    258         btnSubscribe.setOnClickListener(this);
    259         btnPublish  .setOnClickListener(this);
    260 
    261         initMqtt();
    262 
    263     }
    264     void makeNotification(final String title,final String content){
    265 
    266         NotificationCompat.Builder mBuilder = new NotificationCompat.Builder(this);
    267         mBuilder.setSmallIcon(R.drawable.mail_3_small);//must
    268         mBuilder.setContentTitle(title);
    269         mBuilder.setContentText(content);
    270         // Creates an explicit intent for an Activity in your app
    271         Intent resultIntent = new Intent(this, MainActivity.class);
    272 
    273         // The stack builder object will contain an artificial back stack for the
    274         // started Activity.
    275         // This ensures that navigating backward from the Activity leads out of
    276         // your app to the Home screen.
    277         TaskStackBuilder stackBuilder = TaskStackBuilder.create(this);
    278         // Adds the back stack for the Intent (but not the Intent itself)
    279         stackBuilder.addParentStack(MainActivity.class);
    280         // Adds the Intent that starts the Activity to the top of the stack
    281         stackBuilder.addNextIntent(resultIntent);
    282         PendingIntent resultPendingIntent = stackBuilder.getPendingIntent(0,PendingIntent.FLAG_UPDATE_CURRENT);
    283         mBuilder.setContentIntent(resultPendingIntent);
    284         NotificationManager mNotificationManager =
    285                 (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
    286 
    287         // mNotificationId is a unique integer your app uses to identify the
    288         // notification. For example, to cancel the notification, you can pass its ID
    289         // number to NotificationManager.cancel().
    290         mNotificationManager.notify(R.string.app_name, mBuilder.build());
    291     }
    292 
    293     void blocking(){
    294         BlockingConnection connection = mqtt.blockingConnection();
    295         try {
    296             connection.connect();
    297             //publish
    298             connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);
    299 
    300             //subscribe
    301             Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
    302             byte[] qoses = connection.subscribe(topics);
    303 
    304             //receive message
    305             Message message = connection.receive();
    306             System.out.println(message.getTopic());
    307             byte[] payload = message.getPayload();
    308             // process the message then:
    309             message.ack();
    310 
    311             //disconnect
    312             connection.disconnect();
    313         } catch (Exception e) {
    314             e.printStackTrace();
    315         }
    316     }
    317 
    318     @Override
    319     public void onClick(View view) {
    320         switch (view.getId()){
    321             case R.id.btn_publish   :   publish();      break;
    322             case R.id.btn_subscribe :   subscribe();    break;
    323         }
    324 
    325     }
    326 
    327     @Override
    328     public void onCheckedChanged(CompoundButton compoundButton, boolean b) {
    329         switch (compoundButton.getId()){
    330             case R.id.btn_connect:
    331                 if (!b){
    332                     connect();
    333                 }else{
    334                     disconnect();
    335                 }
    336                 break;
    337             case R.id.cbx_persist:
    338                 if (mqtt != null) {
    339                     mqtt.setClientId(edtClientID.getText().toString().trim());
    340                     mqtt.setCleanSession(!b);
    341                 }
    342                 break;
    343         }
    344     }
    345 
    346 }

    3.4 完整下载地址

      https://git.oschina.net/xi/mqtt-client-demo.git

    4.MQTT 常用方法介绍

    setClientId

    Use to set the client Id of the session. This is what an MQTT server uses to identify a session where setCleanSession(false); is being used.

    The id must be 23 characters or less. Defaults to auto generated id (based on your socket address, port and timestamp).

    每个客户端id不要相同
    指定id后,才可以调用setCleanSession,持久保存订阅的会话,哪个客户端订阅了哪个主题就保存在某个会话中。
    setCleanSession Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. Defaults to true.
    设置false时,服务端将不清除会话,这样就可以持久保存订阅关系。
    setKeepAlive

    Configures the Keep Alive timer in seconds. Defines the maximum time interval between messages received from a client.

    It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout.

    设置保活时间,单位是秒,默认为tpc连接时间。
    setUserName Sets the user name used to authenticate against the server.
    设置服务端验证的用户名
    setPassword Sets the password used to authenticate against the server.
    设置验证用户的密码
    setWillTopic

    If set the server will publish the client's Will message to the specified topics if the client has an unexpected disconnection.

    当客户端异常断开时,服务器按这里指定的主题发意愿消息。
    setWillMessage The Will message to send. Defaults to a zero length message.
    意愿消息
    setWillQos Sets the quality of service to use for the Will message. Defaults to QoS.AT_MOST_ONCE.
    意愿消息的QoS
    setWillRetain Set to true if you want the Will to be published with the retain option.
    setVersion Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the 3.1 protocol version.
    设置MQTT协议版本
  • 相关阅读:
    玩机分享之群晖利用反代域名访问
    Clipboard.SetText()卡住问题
    KB4040973 KB3178034 补丁导致wpf无法启动异常
    WPF 启动缓慢问题
    Jetbrains系列产品2019.2.3最新激活方法
    .net 4.0 以下HttpWebRequest Header 修改hosts方法
    Crypto++ 无法解析的外部符号 CryptoPP::AssignIntToInteger
    关于WDK开发内核签名之WHQL签名认证流程简介
    ico制作工具
    VUE监听滚动条事件
  • 原文地址:https://www.cnblogs.com/mhbs/p/7443941.html
Copyright © 2011-2022 走看看