zoukankan      html  css  js  c++  java
  • MQTT3.1.1协议阅读笔记1

    本文主要是记录阅读 MQTT3.1.1协议中文版 时的心得感悟。

    环境信息

    1. 使用Docker运行emqx,作为MQTT的服务端
    2. 使用mqtt-spy.jar作为MQTT的客户端
    3. 使用Paho写一个简单的Java-MQTT客户端
    4. 使用WireShark进行协议抓包

    MQTT 简介

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于 TCP/IP 协议之上,由IBM在1999年发布。。

    一个 MQTT 控制报文包含三个部分:

    组成部分 长度
    固定报头 2-5个字节 存在于所有MQTT控制包
    可变报头 存在于某些MQTT控制包
    载荷 存在于某些MQTT控制包
    • 我们借助 MQTT 协议发送的消息内容保存在载荷中

    1.固定报头

    固定报头由两部分组成:控制包类型和剩余长度

    • 控制包类型目前有 14 种;
    • 剩余长度表示的是“可变报头+载荷”的总长度

    如上图所示,这是一条控制包类型为 CONNECT 的 MQTT 报文,固定报头的中 剩余长度用16进制表示为 0x1e,用10进制表示为 30。

    从 1e 的后一个字节 00 到末尾刚好是 30 个字节。

    1.1 剩余长度与控制包最大长度256M

    剩余长度使用了一种可变长度的结构来编码,这种结构使用单一字节表示0-127的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。剩余长度最多可以用四个字节来表示。

    用n个字节表示剩余长度 剩余长度范围起始值 剩余长度范围结束值
    1 0 (0x00) 127 (0x7F)
    2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
    3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
    4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

    这将允许应用发送可变报头和载荷总长度为255M大小的控制包。这个数字用16进制表示为:0xFF,0xFF,0xFF,0x7F。

    换句话说,这将允许应用发送最多256M大小的控制包。

    2.可变报头

    以 CONNECT 报文的可变报头为例,主要包含协议名称(MQTT)和协议版本号(v3.1.1对应4);

    2.1 MSB 和 LSB

    至于 Length MSB(Most Significant Bit,最高有效位) 和 Length LSB (Last/Least Significant Bit,译作最低有效位),

    把 MSB 和 LSB 用大端字节/网络字节序来读取,读取的值可以表示协议名称的长度。

    以下是 Java 写成的 Demo:

    import java.io.*;
    import java.util.Arrays;
    
    public class Utf8Characters {
    
        public static void main(String[] args) throws IOException {
            // 模拟写入
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream dataOut = new DataOutputStream(baos);
    
            dataOut.writeUTF("MQTT");
    
            byte[] bytes = baos.toByteArray();
            System.out.println(Arrays.toString(bytes)); // 打印 [0, 4, 77, 81, 84, 84]
    
            // 模拟读取
            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
            DataInputStream dataIn = new DataInputStream(bais);
    
            int len = dataIn.readUnsignedShort(); // 2 bytes
            byte[] decodedString = new byte[len]; // 4 bytes
            dataIn.read(decodedString);
            String target = new String(decodedString, "UTF-8");
            System.out.println(target); // 打印 MQTT
    
            // 重置一下,重新读取
            dataIn.reset();
            // 等同于
            String result = dataIn.readUTF();
            System.out.println(result); // 打印 MQTT
    
        }
    }
    

    3. MQTT的特别之处

    我们在学习TCP/IP协议的时候,就知道 ACK 这个概念,其实许多构建在TCP/IP协议之上的应用层协议也都会使用 XXXACK 包来表示已经成功接收 XXX 信息。
    MQTT也不能“免俗”:

    • 连接报文 CONNECT 对应连接确认报文 CONNACK;
    • 订阅报文 SUBSCRIBE 对应订阅确认报文 SUBACK;
    • 取消订阅报文 UNSUBSCRIBE 对应取消订阅确认报文 UNSUBACK;
    • 发布报文 PUBLISH 对应发布确认报文 PUBACK。

    还有就是名字中没有使用 ACK,但是实际上也是“一问一答”式的 PINGREQ 和 PINGRESP。但是 MQTT 的控制类型中还是有两处“怪异之处”

    • 唯独 DISCONNECT 没有对应的确认报文;
    • PUBLISH 除了有 PUBACK 之外,还有 PUBREC,PUBREL,PUBCOMP

    3.1 遗言/遗嘱Will

    对于一般IoT设备而言,就是一个大循环while不断接收消息,不存在正常退出的逻辑,一般都是断电断网导致的异常退出。DISCONNECT 并不常用,也不用确认。

    但是,如果客户端正常发出了 DISCONNECT 报文,那么服务端收到 DISCONNECT 后必须丢弃所有和当前连接有关的Will Message,不发布。

    我们通常都有判断IoT设备是否在线的需求,使用遗言机制就很好实现。

    • 遗言/遗嘱是CONNECT类型报文中,伴随客户端连接服务端的请求一并发出的;
    • 可变报头中包含 Will Flag,Will QoS,Will Retain;其中 QoS 和 Retain 效果同 publish 报文中的 QoS 和 Retain;
    • 如果可变报头连接标识位 Will Flag 等于1,那么载荷中将包含 Will Topic 和 Will Message 字段;

    模拟遗嘱发送和接收

    由于 mqtt-spy.jar 的无论是点击x关闭,还是杀死进程,都是正常的Disconnect退出,所以只好写一个 Java 客户端来模拟异常退出的场景。

    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Arrays;
    import java.util.UUID;
    
    public class Main {
    
        public static void main(String[] args) throws MqttException {
            String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
            MqttClientPersistence persistence = new MemoryPersistence();
            MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);
    
            MqttConnectOptions options = new MqttConnectOptions();
            // 2 表示 EXACTLY_ONCE
            options.setWill("DeviceStatus", ("{"device":""+ clientId + "","state":"offline"}").getBytes(StandardCharsets.UTF_8),
                    2, false);
            client.connect(options);
        }
    }
    

    运行这个程序,然后再用 mqtt-spy.jar 模拟一个WEB服务器上的MQTT客户端:

    Connections -> New Connection 打开如下图所示的页面,输入Client ID为web,其他都默认,然后 点击Open Connection

    在 Subscriptions and received messages 这一栏点击 New,弹出如下图所示对话框,输入主题DeviceStatus,然后点击 Subscribe

    然后,我们就可以去关闭 Java 的 MQTT客户端了。接着,就收到了遗言:

    然后,我还找到了 WireShark 抓取的Java的MQTT客户端发出的Connect报文:

    3.1.1 WILL MESSAGE长度限制65535个字节

    从理论上来说,MQTT 中的字符串符合以下形式:

    用两个字节表示内容长度,因此内容长度可以是0到65535个字节。

    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.Arrays;
    import java.util.UUID;
    import java.util.stream.IntStream;
    
    public class Main {
    
        public static void main(String[] args) throws MqttException, IOException {
            String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
            MqttClientPersistence persistence = new MemoryPersistence();
            MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);
    
            MqttConnectOptions options = new MqttConnectOptions();
            StringBuilder sb = new StringBuilder();
            IntStream.range(0, 65536).forEach(i -> {
                    int result = i % 10;
                    sb.append(result);
            });
            // 2 表示 EXACTLY_ONCE
            byte[] payload = sb.toString().getBytes(StandardCharsets.UTF_8);
            options.setWill("DeviceStatus", payload,
                    2, false);
            client.connect(options);
        }
    }
    

    如上面这段代码模拟了 Will Message 为 65536 个字节,服务器直接断开了客户端的连接:

    从图中可以看出,Will Message 超长了,导致长度为0。具体可以看这段代码:org.eclipse.paho.client.mqttv3.internal.wire.MqttConnect#getPayload

    if (willMessage != null) {
      encodeUTF8(dos, willDestination);
      dos.writeShort(willMessage.getPayload().length); // 这段代码再跟进去看,(v >>> 8) & 0xFF 计算等于 0,(v >>> 0) & 0xFF 计算也等于 0
      dos.write(willMessage.getPayload());
    }
    

    3.1.2 Will Retain只保持最新一条

    RETAIN(保持)
    1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。
    备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。

    ※ 实验如下:
    修改 3.1 中 Main 的代码:

    // retain 由 false 改为 true
    options.setWill("DeviceStatus", ("{"device":""+ clientId + "","state":"offline"}").getBytes(StandardCharsets.UTF_8),
                    2, true);
    

    然后,在 Run Configuration 中拷贝三份 Main,并分别命名为 iot_1,iot_2,iot_3,并且 Program arguments 也分别为 iot_1,iot_2,iot_3:

    分别运行 iot_1,iot_2,iot_3,然后再依次结束他们。

    然后,再启动 mqtt-spy.jar,并订阅主题 DeviceStatus:

    如图所示,我们观察到新订阅者只获取到主题中的最新一条消息!

    3.2 QoS

    这个又有很多内容,还是另开一篇 阅读

    参考文档

    MQTT协议笔记之头部信息 阅读

    这篇文章主要解答了我对 Length MSB 和 Length LSB 的疑惑

    Java MQTT 客户端之 Paho 阅读

    如果你对 Java 实现 MQTT 客户端感兴趣,可以读一下这篇

  • 相关阅读:
    Oracle简介
    Python 新建程序
    HTML 标记 3 —— CSS
    Dreamweaver 2
    Dreamweaver 1 网页制作
    has-a关系——多重私有继承
    has-a关系——包含对象成员的类
    《使用wxWidgets进行跨平台程序开发》chap02——一个简单的应用程序
    抽象基类(ABC),纯虚函数
    普通类继承
  • 原文地址:https://www.cnblogs.com/kendoziyu/p/15100021.html
Copyright © 2011-2022 走看看