zoukankan      html  css  js  c++  java
  • MQTT Java Client开发

    1、客户端库下载

    下载地址:https://www.eclipse.org/paho/downloads.php

    如下图所示,有不用编程语言当前支持情况说明。

    image

    如下图所示,咱们此处已Java为例,下载正式发布的版本。

    image

    当前最新版本为Java最新版本为1.2.2。

    image

    下载到的jar包如下图所示:

    clip_image002

    将该jar包导入到我们的项目中,就可以使用了。

    2、登陆连接

    先创建MqttClinet对象。

    private volatile MqttClient mqttClient; 
    private volatile MqttMessage mqttMessage;
    private MqttServerEntity mqttServerEntity;
    // 初始化MQTTClient对象
    private void initClient() {
        try {
            mqttClient = new MqttClient(getHostUrl(), getClientId());
        } catch (MqttException e) {
            LogUtils.error(logger, e);
            mqttClient = null;
        }
    }

    封装连接参数。

    设置回调接口。

    准备工作做好后,执行连接即可。

    // 连接MQTT服务器
    public void startClient() {
        initClient();
        if (mqttClient == null) {
            LogUtils.info(logger, "mqttClient is null");
            return;
        }
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttServerEntity.getUsername());
        options.setPassword(mqttServerEntity.getPassword().toCharArray());
        options.setConnectionTimeout(5); // 设置超时时间
        options.setCleanSession(getCleanSession());
        options.setKeepAliveInterval(getKeepAliveInterval());// 设置会话心跳时间
        options.setAutomaticReconnect(true); // 自动重连
        try {
            mqttClient.setCallback(new BtcMqttCallback());
            mqttClient.connect(options);
            subscribe();
        } catch (Exception e) {
            LogUtils.error(logger, e);
        }
        LogUtils.info(logger, "startClient() isConnected:" + mqttClient.isConnected());
    }

    3、订阅主题

    订阅主题发生在服务器连接登陆成功之后,这里主要有两点,发布消息的服务质量、以及订阅的主题信息。

    // 订阅主题
    private void subscribe() {
        try {
            int[] Qos = {getQos()};
            String[] topic1 = {mqttServerEntity.getSubscribeTopic()};
            mqttClient.subscribe(topic1, Qos);
        } catch (Exception e) {
            LogUtils.error(logger, e);
        }
    }

    4、发送消息

    发送消息时要保证当前客户端与服务器处于连接成功的状态。将主题及消息封装好后,调用发送接口即可。

    // 发送消息
    public void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic) {
        try {
            if (mqttMessage == null) {
                mqttMessage = new MqttMessage();
                mqttMessage.setQos(getQos());
                mqttMessage.setRetained(true);
            }
            mqttMessage.setPayload(data.getBytes("UTF-8")); 
            mqttClient.publish(topic, mqttMessage);
        } catch (Exception e) {
            LogUtils.error(logger, e);
        }
    }

    5、消息接收

    消息接收是采用回调接口的形式,是建立连接之前设置的,连接成功之后,只有有消息就会回调到下面的方法。

    public class BtcMqttCallback implements MqttCallbackExtended {
        public void connectionLost(Throwable cause) {
            LogUtils.info(logger, "connection lost");
        }
        public void deliveryComplete(IMqttDeliveryToken token) {
            LogUtils.info(logger, "delivery Complete:" + token.isComplete());
        }
        public void messageArrived(String topic, MqttMessage message) {
            String msg = new String(message.getPayload(), Charset.forName("UTF-8"));
            LogUtils.info(logger, "messageArrived() topic:" + topic);
            LogUtils.info(logger, msg);
            MessageCache.getInstance().putMessage(msg);
        }
        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            LogUtils.info(logger, "connectComplete() reconnect:" + reconnect + " serverURI:" + serverURI);
            subscribe();
        }
    }

     

     

    【参考资料】

    https://www.eclipse.org/paho/clients/java/

  • 相关阅读:
    2429: [HAOI2006]聪明的猴子
    1789: [Ahoi2008]Necklace Y型项链
    3399: [Usaco2009 Mar]Sand Castle城堡
    3713: [PA2014]Iloczyn
    1907: 树的路径覆盖
    2751: [HAOI2012]容易题(easy)
    算法模板——计算几何2(二维凸包——Andrew算法)
    算法模板——splay区间反转 2
    算法模板——Dinic网络最大流 2
    算法模板——Dinic最小费用最大流
  • 原文地址:https://www.cnblogs.com/shidian/p/11778270.html
Copyright © 2011-2022 走看看