zoukankan      html  css  js  c++  java
  • Android Mqtt 消息推送使用

    初始化SDK:

    /**
         * 初始化SDK
         *
         * @param context context
         */
        public void initSDK(Context context) {
            String clientId = String.valueOf(System.currentTimeMillis()+userId);
            mqttAndroidClient = new MqttAndroidClient(mContext, serverUri, clientId);
            subscriptionTopics = new ArrayList<>();
            mqttAndroidClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
    
                    if (reconnect) {
    
                        Log.d(TAG, "Reconnected to : " + serverURI);
                        // Because Clean Session is true, we need to re-subscribe
    //                    subscribeToTopic();
                        //publishMessage();
                    } else {
                        Log.d(TAG, "Connected to: " + serverURI);
    
                    }
                    connectSuccess = true;
    
                    subscribeToTopic();
                }
    
                @Override
                public void connectionLost(Throwable cause) {
                    connectSuccess = false;
                    Log.e(TAG, "The Connection was lost." + cause.getLocalizedMessage());
    
                }
    
                // THIS DOES NOT WORK!
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.d(TAG, "Incoming message: " +topic+ new String(message.getPayload()));
    
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
    
                }
            });
    
    
        }
    

    连接远程服务:

    /**
         * 连接远程服务
         */
        public void connectServer() {
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setAutomaticReconnect(true);
            mqttConnectOptions.setCleanSession(false);
    
            try {
                //addToHistory("Connecting to " + serverUri);
    
                mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        connectSuccess = true;
                        DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                        disconnectedBufferOptions.setBufferEnabled(true);
                        disconnectedBufferOptions.setBufferSize(100);
                        disconnectedBufferOptions.setPersistBuffer(false);
                        disconnectedBufferOptions.setDeleteOldestMessages(false);
                        mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    }
    
                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        Log.e(TAG, "Failed to connect to: " + serverUri);
                        exception.printStackTrace();
                        Log.d(TAG, "onFailure: " + exception.getCause());
                        connectSuccess = false;
    
                    }
                });
    
    
            } catch (MqttException ex) {
                ex.printStackTrace();
            }
        }
    

    获取订阅信息:

      /**
         *获取订阅信息
       */

    public void connectGateway(String gatewayId, String userId) {
    //获取订阅信息
            if (!subscriptionTopics.contains(gatewayId)) {
                subscriptionTopics.add(gatewayId);
            }
            Log.d(TAG, "pre sub topic: connect status=" + connectSuccess);
            Log.d(TAG, "subtopic " + subscriptionTopics);
            subscribeToTopic();
        }
    

      

    订阅mqtt消息:

    /**
         * 订阅mqtt消息
         */
        private void subscribeToTopic() {
            try {
                if(subscriptionTopics.size()==0)
                    return;
                String[] topics = new String[subscriptionTopics.size()];
                subscriptionTopics.toArray(topics);
                int[] qoc = new int[topics.length];
                IMqttMessageListener[] mqttMessageListeners = new IMqttMessageListener[topics.length];
                for (int i = 0; i < topics.length; i++) {
                    IMqttMessageListener mqttMessageListener = new IMqttMessageListener() {
                        @Override
                        public void messageArrived(String topic, MqttMessage message) throws Exception {
                            // message Arrived!消息送达后做出的处理
                            Log.d(TAG, topic + " : " + new String(message.getPayload()));
                            handleReceivedMessage(new String(message.getPayload()), topic);
                        }
                    };
                    mqttMessageListeners[i] = mqttMessageListener;
                    Log.d(TAG, "subscribeToTopic: qoc= " + qoc[i]);
                }
                mqttAndroidClient.subscribe(topics, qoc, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken iMqttToken) {
                        Log.d(TAG, "Subscribed!");
                    }
    
                    @Override
                    public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                        Log.d(TAG, "Failed to subscribe");
                    }
                }, mqttMessageListeners);
    
            } catch (MqttException ex) {
                System.err.println("Exception whilst subscribing");
                ex.printStackTrace();
            }
    
        }

    处理收到的消息:

    private void handleReceivedMessage(String message, String gatewayId) {
    //可以发送一条广播通知程序
    }    
    

      

    发送mqtt消息:

    /**
         * 发送 mqtt 消息
         *
         * @param publishMessage 要发送的信息的 字符串
         */
        private void publishMessage(String publishMessage, String publishTopic) {
                try {
                    publishTopic = userId + "/" + publishTopic;
                    MqttMessage message = new MqttMessage();
                    message.setPayload(publishMessage.getBytes());
                    mqttAndroidClient.publish(publishTopic, message);
                    Log.d(TAG, "publishMessage:Message Published 
    " + publishTopic + ":" + message);
                    if (!mqttAndroidClient.isConnected()) {
                        Log.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
                    }
                } catch (MqttException e) {
                    System.err.println("Error Publishing: " + e.getMessage());
                    e.printStackTrace();
                }
        }
    

      

    注意:当时调用initSdk方法是在application中,最后发现跟信鸽推送里的mqtt消息有冲突,导致信鸽推送手机收不到,还弹出程序运行停止的崩溃弹窗,看log是信鸽报的错,曾以为是信鸽的问题,困扰了好久,最后才发现是自己的mqtt消息初始化的问题,只好把mqtt消息的初始化放到welcomeActivity里。

    没有封装的类:

    public class SubscribeClient {
        private final static String CONNECTION_STRING = "tcp://mqtt地址:mqtt端口";
        private final static boolean CLEAN_START = true;
        private final static short KEEP_ALIVE = 30;//低耗网络,但是又需要及时获取数据,心跳30s
        private final static String CLIENT_ID = "client1";
        private final static String[] TOPICS = {
                //订阅信息
        };
        private final static int[] QOS_VALUES = {0, 0, 2, 0};
    
        private MqttClient mqttClient = null;
    
        public SubscribeClient(String i) {
            try {
                mqttClient = new MqttClient(CONNECTION_STRING);
                SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
                mqttClient.registerSimpleHandler(simpleCallbackHandler);//注册接收消息方法
                mqttClient.connect(CLIENT_ID + i, CLEAN_START, KEEP_ALIVE);
                mqttClient.subscribe(TOPICS, QOS_VALUES);//订阅接主题
    
                /**
                 * 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
                 */
    
                mqttClient.publish(PUBLISH_TOPICS, "keepalive".getBytes(), QOS_VALUES[0], true);
    
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        /**
         * 简单回调函数,处理client接收到的主题消息
         *
         * @author pig
         */
        class SimpleCallbackHandler implements MqttSimpleCallback {
    
            /**
             * 当客户机和broker意外断开时触发
             * 可以再此处理重新订阅
             */
            @Override
            public void connectionLost() throws Exception {
                // TODO Auto-generated method stub
                System.out.println("客户机和broker已经断开");
            }
    
            /**
             * 客户端订阅消息后,该方法负责回调接收处理消息
             */
            @Override
            public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("订阅主题: " + topicName);
                System.out.println("消息数据: " + new String(payload));
                System.out.println("消息级别(0,1,2): " + Qos);
                System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): " + retained);
            }
    
        }
    
        /**
         * 高级回调
         *
         * @author pig
         */
        class AdvancedCallbackHandler implements MqttSimpleCallback {
    
            @Override
            public void connectionLost() throws Exception {
                // TODO Auto-generated method stub
    
            }
    
            @Override
            public void publishArrived(String arg0, byte[] arg1, int arg2,
                                       boolean arg3) throws Exception {
                // TODO Auto-generated method stub
    
            }
    
        }
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            new SubscribeClient("" + i);
    
        }
    
    }
    

      

  • 相关阅读:
    鸟哥的Linux私房菜学习笔记(1)
    Linux下搭建Oracle11g RAC(4)----配置oracle,grid用户SSH对等性
    解决升级windows8.1 Oracle服务被刷新
    Linux下搭建Oracle11g RAC(3)----创建用户及配置相关文件
    Linux下搭建Oracle11g RAC(2)----配置DNS服务器,确认SCAN IP可以被解析
    Linux下搭建Oracle11g RAC(1)----IP分配与配置IP
    Oracle11g新特性导致空表不能导出问题
    svn is already locked 最终解决方案
    .cur 图片加载提示 You may need an appropriate loader to handle this file type
    Request header field userRole is not allowed by Access-Control-Allow-Headers in preflight response.
  • 原文地址:https://www.cnblogs.com/IT-lss/p/8004749.html
Copyright © 2011-2022 走看看