zoukankan      html  css  js  c++  java
  • android 实现mqtt消息推送,以及不停断线重连的问题解决

    前段时间项目用到mqtt的消息推送,整理一下代码,代码的原型是网上找的,具体哪个地址已经忘记了。

    代码的实现是新建了一个MyMqttService,全部功能都在里面实现,包括连服务器,断线重连,订阅消息,处理消息,发布消息等基本操作。

    首先添加依赖:

    dependencies {
        implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
        implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    }
    然后编辑AndroidManifest.xml,先添加权限:

    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />
    再注册service:

    <service android:name="org.eclipse.paho.android.service.MqttService" />
    <service
        android:name=".service.MyMqttService"
        android:enabled="true"
        android:exported="true"/>
    接着进入正文MyMqttService.java,功能见注释吧:

    package com.example.nan.mqtt.service;

    import android.app.Notification;
    import android.app.NotificationManager;
    import android.app.PendingIntent;
    import android.app.Service;
    import android.content.Context;
    import android.content.Intent;
    import android.graphics.BitmapFactory;
    import android.os.Bundle;
    import android.os.IBinder;
    import android.support.v4.app.NotificationCompat;
    import android.util.Log;

    import com.google.gson.Gson;
    import com.google.gson.reflect.TypeToken;

    import org.eclipse.paho.android.service.MqttAndroidClient;
    import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
    import org.eclipse.paho.client.mqttv3.IMqttActionListener;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.IMqttToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.json.JSONObject;

    /**
    * @author nan
    */
    public class MyMqttService extends Service {

    private static final String TAG = "nlgMqttService";
    private static final String TOPIC_TO_QA = "/s2c/task_quality/";

    private static final String publishTopic = "exampleAndroidPublishTopic";

    private MqttAndroidClient mqttAndroidClient;
    private NotificationManager mNotificationManager;


    public MyMqttService() {
    }

    @Override
    public IBinder onBind(Intent intent) {
    // TODO: Return the communication channel to the service.
    throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override
    public void onCreate() {
    super.onCreate();
    Log.d(TAG, "MqttService onCreate executed");
    //mqtt服务器的地址
    final String serverUri = "tcp://192.168.10.10:1883";
    //新建Client,以设备ID作为client ID
    mqttAndroidClient = new MqttAndroidClient(MyMqttService.this, serverUri, getIMEI());
    mqttAndroidClient.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
    //连接成功
    if (reconnect) {
    Log.d(TAG, "connectComplete: " + serverURI);
    // Because Clean Session is true, we need to re-subscribe
    subscribeAllTopics();
    } else {
    Log.d(TAG, "connectComplete: " + serverURI);
    }
    }

    @Override
    public void connectionLost(Throwable cause) {
    //连接断开
    Log.d(TAG, "connectionLost: connection was lost");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
    //订阅的消息送达,推送notify
    String payload = new String(message.getPayload());
    Log.d(TAG, "Topic: " + topic + " ==> Payload: " + payload);
    if(mNotificationManager == null) {
    mNotificationManager = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
    }
    int roleId = SinSimApp.getApp().getRole();
    Gson gson = new Gson();
    ServerToClientMsg msg = gson.fromJson(payload, new TypeToken<ServerToClientMsg>(){}.getType());
    if(msg != null) {
        //接受消息
        if(topic != null) {
    if(topic.equals(TOPIC_TO_QA)) {
    Intent intent = new Intent(MyMqttService.this, ProcessToCheckoutActivity.class);
    PendingIntent pi = PendingIntent.getActivity(MyMqttService.this, 0, intent, 0);
    NotificationCompat.Builder builder = new NotificationCompat.Builder(MyMqttService.this, TOPIC_TO_QA);
    Notification notify = builder.setSmallIcon(R.mipmap.to_quality)
    .setLargeIcon(BitmapFactory.decodeResource(getResources(), R.mipmap.to_quality))
    .setDefaults(Notification.DEFAULT_SOUND|Notification.DEFAULT_VIBRATE)//响铃震动
    .setContentTitle("快递来了")
    .setAutoCancel(true)
    .setContentIntent(pi)
    .setVisibility(Notification.VISIBILITY_PUBLIC)
    .setContentText("你的快递单号:" + msg.getOrderNum())
    //不设置此项不会悬挂,false 不会出现悬挂
    .build();
    mNotificationManager.notify(2,notify);
    }
    }
    }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
    //即服务器成功delivery消息
    }
    });
    //新建连接设置
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    //断开后,是否自动连接
    mqttConnectOptions.setAutomaticReconnect(true);
    //是否清空客户端的连接记录。若为true,则断开后,broker将自动清除该客户端连接信息
    mqttConnectOptions.setCleanSession(false);
    //设置超时时间,单位为秒
    //mqttConnectOptions.setConnectionTimeout(2);
    //心跳时间,单位为秒。即多长时间确认一次Client端是否在线
    //mqttConnectOptions.setKeepAliveInterval(2);
    //允许同时发送几条消息(未收到broker确认信息)
    //mqttConnectOptions.setMaxInflight(10);
    //选择MQTT版本
    mqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
    try {
    Log.d(TAG, "onCreate: Connecting to " + serverUri);
        //开始连接
    mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
    @Override
    public void onSuccess(IMqttToken asyncActionToken) {
    Log.d(TAG, "onSuccess: Success to connect to " + serverUri);
    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
    disconnectedBufferOptions.setBufferEnabled(true);
    disconnectedBufferOptions.setBufferSize(100);
    disconnectedBufferOptions.setPersistBuffer(false);
    disconnectedBufferOptions.setDeleteOldestMessages(false);
    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
        //成功连接以后开始订阅
    subscribeAllTopics();
    }

    @Override
    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    //连接失败
        Log.d(TAG, "onFailure: Failed to connect to " + serverUri);
    exception.printStackTrace();
    }
    });
    } catch (MqttException ex) {
    ex.printStackTrace();
    }

    //service绑定notification
    Intent intent = new Intent(this, SplashActivity.class);
    intent.putExtra(SinSimApp.FROM_NOTIFICATION, true);
    //这边设置“FLAG_UPDATE_CURRENT”是为了让后面的Activity接收pendingIntent中Extra的数据
    PendingIntent pi = PendingIntent.getActivity(this, 0, intent, PendingIntent.FLAG_UPDATE_CURRENT);
    Notification notification = new NotificationCompat.Builder(this)
    .setContentTitle("mqtt快递")
    .setContentText("mqtt快递管理系统")
    .setWhen(System.currentTimeMillis())
    .setSmallIcon(R.mipmap.ic_launcher)
    .setLargeIcon(BitmapFactory.decodeResource(getResources(), R.mipmap.ic_launcher))
    .setContentIntent(pi)
    .build();
    startForeground(1, notification);
    }

    //订阅所有消息
        private void subscribeAllTopics() {
    subscribeToTopic(TOPIC_TO_QA);
    }

    /**
    * 订阅消息
    */
    public void subscribeToTopic(String subscriptionTopic) {
    try {
    mqttAndroidClient.subscribe(subscriptionTopic, 2, null, new IMqttActionListener() {
    @Override
    public void onSuccess(IMqttToken asyncActionToken) {
    Log.d(TAG, "onSuccess: Success to Subscribed!");
    }

    @Override
    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    Log.d(TAG, "onFailure: Failed to subscribe");
    }
    });
    } catch (MqttException ex) {
    Log.d(TAG, "subscribeToTopic: Exception whilst subscribing");
    ex.printStackTrace();
    }
    }

    /**
    * 发布消息
    */
    public void publishMessage(String msg) {
    try {
    MqttMessage message = new MqttMessage();
    message.setPayload(msg.getBytes());
    mqttAndroidClient.publish(publishTopic, message);
    Log.d(TAG, "publishMessage: Message Published: " + msg);
    } catch (MqttException e) {
    Log.d(TAG, "publishMessage: Error Publishing: " + e.getMessage());
    e.printStackTrace();
    }
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
    Log.d(TAG, "MqttService onStartCommand executed");
    return super.onStartCommand(intent, flags, startId);
    }

    @Override
    public void onDestroy() {
    super.onDestroy();
    try {
    if(mqttAndroidClient!=null){
    //服务退出时client断开连接
    mqttAndroidClient.disconnect();
    }
    } catch (MqttException e) {
    e.printStackTrace();
    }
    Log.d(TAG, "MqttService onDestroy executed");
    }
    }


    调试过程中出现过一个小插曲:服务在有些时候会不停的断线重连。断线重连的设置是开了的:
    mqttConnectOptions.setAutomaticReconnect(true);
    但是断开的原因找不到,当时还没有重写onDestory方法,就算退出应用也还在重连,一度怀疑service的开启与关闭的问题,还系统的重新学习了一下service的使用,学完以后也没有啥进展,然后重学mqtt的调用流程发挥了效果,在onDestory里面调用了disconnect()方法,完了以后在退出应用以后就不会重连了,但是重新开还是继续不停重连。到了晚上,奇怪的事情发生了,当夜深人静,独自加班的时候,居然再也复现不了了。为什么呢,心想可能平时给八阿哥上的香起了效果,那就开心的回家吧。下班的路上虽然开心的吃了块鸡排,但心里的结还是没有打开,为什么呢,是道德的沦丧还是人性的扭曲,让我独自加班还不饿给我复现问题。突然灵光一现,想到今天特么加班就我一个人,也就是一个人玩就是好的,玩的人多就会有问题,那么答案就来了,跟唯一性有关的只有clientID了,特么老子把clientID设置成用户id了,测试用的用户id就注册了3个,好几个人来回切着用,不出问题才怪。于是我默默的把clientID改成了设备id,困扰2天的问题就这么解决了。
    ---------------------
    作者:邦德总管
    来源:CSDN
    原文:https://blog.csdn.net/a5nan/article/details/79975488
    版权声明:本文为博主原创文章,转载请附上博文链接!

  • 相关阅读:
    大数据学习路线图 让你精准掌握大数据技术学习
    在AI人工智能中如何巧妙学习大数据编程,成为五十万年薪的佼佼者
    大数据学习之Hadoop快速入门
    大数据学习|小白学习大数据需要满足这六个条件你就能学好大数据
    大数据学习路线(自己制定,从零开始)
    大数据学习之路(跟着大神学习一波)
    为什么这么多人学习大数据?新手该如何上手大数据?
    大数据学习路线图 让你精准掌握大数据技术学习?
    [监督学习]GDA 高斯判别分析
    The Josephus problem
  • 原文地址:https://www.cnblogs.com/yelanggu/p/9877331.html
Copyright © 2011-2022 走看看