zoukankan      html  css  js  c++  java
  • Android Apollo MQTT入门

    一、Apache Apollo服务器其实是一个消息中转站

    下载地址 http://activemq.apache.org/apollo/download.html

    服务搭建方式,参看博客Android APP必备高级功能,消息推送之MQTT

    1、命令行进入解压后bin目录(例:E:>cd E:MQTTapache-apollo-1.7.1in)。

    2、输入apollo create XXX(xxx为创建的服务器实例名称,例:apollo create mybroker),之后会在bin目录下创建名称为XXX的文件夹。

      XXX文件夹下etcapollo.xml文件下是配置服务器信息的文件。

      etcusers.properties文件包含连接MQTT服务器时用到的用户名和密码,默认为admin=password,即账号为admin,密码为password,可自行更改。

    3、进入XXX/bin目录,输入apollo-broker.cmd run开启服务器,看到如下界面代表搭建完成

    4、添加Windows服务

    进入XXX/bin目录,如下图:

    二、Android

    AndroidManifest

        <uses-permission android:name="android.permission.INTERNET" />
        <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
        <uses-permission android:name="android.permission.WAKE_LOCK" />
            <!-- Mqtt Service -->
            <service android:name="org.eclipse.paho.android.service.MqttService" />
            <service android:name="com.zyp.mqtt.MQTTService"/>

    依赖项

    buildscript {
        repositories {
            jcenter()
        }
        dependencies {
            classpath 'com.android.tools.build:gradle:2.3.2'
    
            // NOTE: Do not place your application dependencies here; they belong
            // in the individual module build.gradle files
        }
    }
    
    allprojects {
        repositories {
            jcenter()
            mavenCentral()
            maven { url "https://repo.eclipse.org/content/repositories/paho-releases/" }
        }
    }
        compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
        compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.0'
        compile 'org.greenrobot:eventbus:3.0.0'

    Android Service

    public class MQTTService  extends Service {
    
        public static final String TAG = MQTTService.class.getSimpleName();
    
        private static MqttAndroidClient client;
        private MqttConnectOptions conOpt;
    
        private String host = "tcp://10.0.2.2:61613";private String userName = "admin";
        private String passWord = "password";
        private static String myTopic = "topic";
        private String clientId = "test";
    
        @Override
        public int onStartCommand(Intent intent, int flags, int startId) {
            init();
            return super.onStartCommand(intent, flags, startId);
        }
    
        public static void publish(String msg){
            String topic = myTopic;
            Integer qos = 0;
            Boolean retained = false;
            try {
                client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
        private void init() {
            // 服务器地址(协议+地址+端口号)
            String uri = host;
            client = new MqttAndroidClient(this, uri, clientId);
            // 设置MQTT监听并且接受消息
            client.setCallback(mqttCallback);
    
            conOpt = new MqttConnectOptions();
            // 清除缓存
            conOpt.setCleanSession(true);
            // 设置超时时间,单位:秒
            conOpt.setConnectionTimeout(10);
            // 心跳包发送间隔,单位:秒
            conOpt.setKeepAliveInterval(20);
            // 用户名
            conOpt.setUserName(userName);
            // 密码
            conOpt.setPassword(passWord.toCharArray());
    
            // last will message
            boolean doConnect = true;
            String message = "{"terminal_uid":"" + clientId + ""}";
            String topic = myTopic;
            Integer qos = 0;
            Boolean retained = false;
            if ((!message.equals("")) || (!topic.equals(""))) {
                // 最后的遗嘱
                try {
                    conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
                } catch (Exception e) {
                    Log.i(TAG, "Exception Occured", e);
                    doConnect = false;
                    iMqttActionListener.onFailure(null, e);
                }
            }
    
            if (doConnect) {
                doClientConnection();
            }
    
        }
    
        @Override
        public void onDestroy() {
            try {
                client.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
            }
            super.onDestroy();
        }
    
        /** 连接MQTT服务器 */
        private void doClientConnection() {
            if (!client.isConnected() && isConnectIsNomarl()) {
                try {
                    client.connect(conOpt, null, iMqttActionListener);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        // MQTT是否连接成功
        private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
    
            @Override
            public void onSuccess(IMqttToken arg0) {
                Log.i(TAG, "连接成功 ");
                try {
                    // 订阅myTopic话题
                    client.subscribe(myTopic,1);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void onFailure(IMqttToken arg0, Throwable arg1) {
                arg1.printStackTrace();
                // 连接失败,重连
            }
        };
    
        // MQTT监听并且接受消息
        private MqttCallback mqttCallback = new MqttCallback() {
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
    
                String str1 = new String(message.getPayload());
                MQTTMessage msg = new MQTTMessage();
                msg.setMessage(str1);
                EventBus.getDefault().post(msg);
                String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
                Log.i(TAG, "messageArrived:" + str1);
                Log.i(TAG, str2);
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken arg0) {
    
            }
    
            @Override
            public void connectionLost(Throwable arg0) {
                // 失去连接,重连
            }
        };
    
        /** 判断网络是否连接 */
        private boolean isConnectIsNomarl() {
            ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
            NetworkInfo info = connectivityManager.getActiveNetworkInfo();
            if (info != null && info.isAvailable()) {
                String name = info.getTypeName();
                Log.i(TAG, "MQTT当前网络名称:" + name);
                return true;
            } else {
                Log.i(TAG, "MQTT 没有可用网络");
                return false;
            }
        }
    
        @Nullable
        @Override
        public IBinder onBind(Intent intent) {
            return null;
        }
    }

    MainActivity 界面方法

    public class MainActivity extends AppCompatActivity {
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
    
            EventBus.getDefault().register(this);
            startService(new Intent(this, MQTTService.class));
            findViewById(R.id.publishBtn).setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View view) {
                    MQTTService.publish("CSDN 一口仨馍");
                }
            });
        }
    
        @Subscribe(threadMode = ThreadMode.MAIN)
        public void getMqttMessage(MQTTMessage mqttMessage){
            Log.i(MQTTService.TAG,"get message:"+mqttMessage.getMessage());
            Toast.makeText(this,mqttMessage.getMessage(),Toast.LENGTH_SHORT).show();
        }
    
        @Override
        protected void onDestroy() {
            EventBus.getDefault().unregister(this);
            super.onDestroy();
        }
    }

    三、Java服务端

    pom文件

        <!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.1.0</version>
    </dependency>

    Server 程序入口

    package com.zyp.mqtt;
    
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class Server {
    
        public static final String HOST = "tcp://localhost:61613"; 
         
        public static final String TOPIC = "topic"; 
        private static final String clientid ="zhaoyazhou_server";  
     
        private MqttClient client; 
        private MqttTopic topic; 
        private String userName = "admin"; 
        private String passWord = "password"; 
     
        private MqttMessage message; 
     
        public Server() throws MqttException { 
             //MemoryPersistence设置clientid的保存形式,默认为以内存保存 
            client = new MqttClient(HOST, clientid, new MemoryPersistence()); 
            connect(); 
        } 
         
        private void connect() { 
            MqttConnectOptions options = new MqttConnectOptions(); 
            options.setCleanSession(true); 
            options.setUserName(userName); 
            options.setPassword(passWord.toCharArray()); 
            // 设置超时时间 
            options.setConnectionTimeout(10); 
            // 设置会话心跳时间 
            options.setKeepAliveInterval(20); 
            try { 
                   client.setCallback(new PushCallback()); 
                   client.connect(options); 
                   topic = client.getTopic(TOPIC); 
            } catch (Exception e) { 
                   e.printStackTrace(); 
            } 
        } 
         
        public void publish(MqttMessage message) throws MqttPersistenceException, MqttException{ 
            MqttDeliveryToken token = topic.publish(message); 
            token.waitForCompletion(); 
            System.out.println(token.isComplete()+"========"); 
        } 
     
        public static void main(String[] args) throws MqttException { 
            Server server =  new Server(); 
            server.message = new MqttMessage(); 
            server.message.setQos(1); 
            server.message.setRetained(true); 
            server.message.setPayload("Server测试MQTT推送消息".getBytes()); 
             server.publish(server.message); 
             System.out.println(server.message.isRetained()+"------ratained状态"); 
        } 
     
    }

    回调函数 PushCallback

    package com.zyp.mqtt;
    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    public class PushCallback  implements MqttCallback {
    
     
        @Override
        public void connectionLost(Throwable arg0) {
            // 连接丢失后,一般在这里面进行重连 
            System.out.println("连接断开,可以做重连");
           
        }
        
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
             // publish后会执行到这里 
            System.out.println("deliveryComplete---------"+ token.isComplete());  
           
        }
        
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
             // subscribe后得到的消息会执行到这里面 
            System.out.println("订阅的字符串:"+topic);
            System.out.println("消息内容:"+message.toString());
           
        } 
    }  

    四、调试

    启动Apollo服务

    将Android App启动,链接上服务器,如图:

    启动服务端程序,发送信息,如图:

    手机端接收到信息,如图:

    参考博客:Android APP必备高级功能,消息推送之MQTT

    参考博客:MQTT JAVA发送、订阅、收集消息

    参考博客:MQTT协议之 Apache Apollo服务

    参考文章:MQTT Part 4 发布,订阅和退订

    参考文章:MQTT基础入门第四部分:MQTT 发布,订阅以及退订

  • 相关阅读:
    Javascript异步编程的4种方法(阮一峰)
    vue 要点
    npm指南
    http请求状态及其含义表
    IOS 7层协议
    js模块化
    UITouch触摸事件
    UIGestureRecognizer手势识别
    UISegmentControl 、UIStepper
    UINavigationController
  • 原文地址:https://www.cnblogs.com/bmbh/p/7865182.html
Copyright © 2011-2022 走看看