zoukankan      html  css  js  c++  java
  • mqtt 协议之 PINGREQ, PINGRESP

      mqtt 协议里最简单的是 ping 协议吧 (心跳包), ping 协议是已连接的客户端发往服务端, 告诉服务端,我还"活着"

    PINGREQ - PING request

    fixed header format.

    bit76543210
    byte 1 Message Type (12) DUP flag QoS level RETAIN
      1 1 0 0 x x x x
    byte 2 Remaining Length (0)
      0 0 0 0 0 0 0 0

    no variable header

    no payload 

    response:   The response to a PINGREQ message is a PINGRESP message.

    PINGRESP - PING response

    fixed header

    bit76543210
    byte 1 Message Type (13) DUP flag QoS level RETAIN
      1 1 0 1 x x x x
    byte 2 Remaining Length (0)
      0 0 0 0 0 0 0 0

    no variable header

    no payload

    ------------------------------------------------------------------------ 华丽的分界线 ---------------------------------------

    客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。两个字节,固定值。

    服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。

    周期定义在 心跳频率在CONNECT(连接包)可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。

    ok ,上代码 :

    固定头部 FinedHeader

        /// <summary>
        /// Fixed header
        /// </summary>
        internal class FixedHeader
        {
            /// <summary>
            /// Message type
            /// </summary>
            public MessageType MessageType { get; set; }
    
            /// <summary>
            /// DUP flag
            /// </summary>
            public bool Dup { get; set; }
    
            /// <summary>
            /// QoS flags
            /// </summary>
            public Qos Qos { get; set; }
    
            /// <summary>
            /// RETAIN 保持
            /// </summary>
            public bool Retain { get; set; }
    
            /// <summary>
            /// Remaining Length 剩余长度
            /// 单个字节最大值:01111111,16进制:0x7F,10进制为127。
            /// MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。
            /// MQTT协议最多允许4个字节表示剩余长度。
            /// 最大长度为:0xFF,0xFF,0xFF,0x7F,
            /// 二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455
            /// </summary>
            public int RemaingLength { get; set; }
    
            public FixedHeader() { }
    
            public FixedHeader(Stream stream)
            {
                if (stream.Length < 2)
                    throw new Exception("The supplied header is invalid. Header must be at least 2 bytes long.");
    
                var byte1 = stream.ReadByte();
                MessageType = (MessageType)((byte1 & 0xf0) >> 4);
                Dup = ((byte1 & 0x08) >> 3) > 0;
                Qos = (Qos)((byte1 & 0x06) >> 1);
                Retain = (byte1 & 0x01) > 0;
    
                //Remaining Length
                //var byte2 = stream.ReadByte();
                var lengthBytes = ReadLengthBytes(stream);
                RemaingLength = CalculateLength(lengthBytes);
            }
    
            public void WriteTo(Stream stream)
            {
                var flags = (byte)MessageType << 4;
                flags |= (Dup ? 1 : 0) << 3;
                flags |= (byte)Qos << 1;
                flags |= Retain ? 1 : 0;
    
                stream.WriteByte((byte)flags);     //byte 1
                if (RemaingLength == 0)         //byte 2
                    stream.WriteByte(0);
                else
                {
                    do
                    {
                        int digit = RemaingLength & 0x7f;
                        RemaingLength = RemaingLength >> 7;
                        if (RemaingLength > 0)
                            digit = digit | 0x80;
                        stream.WriteByte((byte)digit);
                    } while (RemaingLength > 0);
                }
            }
    
            internal static byte[] ReadLengthBytes(Stream stream)
            {
                var lengthBytes = new List<byte>();
    
                // read until we've got the entire size, or the 4 byte limit is reached
                byte sizeByte;
                int byteCount = 0;
                do
                {
                    sizeByte = (byte)stream.ReadByte();
                    lengthBytes.Add(sizeByte);
                } while (++byteCount <= 4 && (sizeByte & 0x80) == 0x80);
    
                return lengthBytes.ToArray();
            }
    
            internal static int CalculateLength(byte[] lengthBytes)
            {
                var remainingLength = 0;
                var multiplier = 1;
    
                foreach (var currentByte in lengthBytes)
                {
                    remainingLength += (currentByte & 0x7f) * multiplier;
                    multiplier *= 0x80;
                }
    
                return remainingLength;
            }
        }

    消息父类: Message

        internal class Message
        {
            public FixedHeader FixedHeader { get; protected set; }
    
            public Message()
            {
            }
    
            public Message(MessageType messageType)
            {
                FixedHeader = new FixedHeader
                {
                    MessageType = messageType
                };
            }
    
            public virtual void WriteTo(Stream stream)
            {
            }
    
            public static Message CreateFrom(byte[] buffer)
            {
                using (var stream = new MemoryStream(buffer))
                {
                    return CreateFrom(stream);
                }
            }
    
            public static Message CreateFrom(Stream stream)
            {
                var header = new FixedHeader(stream);
                return CreateMessage(header, stream);
            }
    
            public static Message CreateMessage(FixedHeader header, Stream stream)
            {
                switch (header.MessageType)
                {
                    case MessageType.CONNACK:
                        return new ConnAckMessage(header, stream);
                    case MessageType.DISCONNECT:
                        return null;
                    case MessageType.PINGREQ:
                        return new PingReqMessage();
                    case MessageType.PUBACK:
                        return new PublishAckMessage(header, stream);
                    case MessageType.PUBCOMP:
                        //return new MqttPubcompMessage(str, header);
                    case MessageType.PUBLISH:
                        //return new MqttPublishMessage(str, header);
                    case MessageType.PUBREC:
                        //return new MqttPubrecMessage(str, header);
                    case MessageType.PUBREL:
                        //return new MqttPubrelMessage(str, header);
                    case MessageType.SUBACK:
                        //return new MqttSubackMessage(str, header);
                    case MessageType.UNSUBACK:
                        //return new MqttUnsubackMessage(str, header);
                    case MessageType.PINGRESP:
                        return new PingRespMessage(header, stream);
                    case MessageType.UNSUBSCRIBE:
                    case MessageType.CONNECT:
                    case MessageType.SUBSCRIBE:
                    default:
                        throw new Exception("Unsupported Message Type");
                }
            }
        }

    两个枚举:

    MessageType  (消息类型)

    Qos (服务质量等级)

        [Flags]
        public enum MessageType : byte
        {
            CONNECT     = 1,
            CONNACK     = 2,
            PUBLISH     = 3,
            PUBACK      = 4,
            PUBREC      = 5,
            PUBREL      = 6,
            PUBCOMP     = 7,
            SUBSCRIBE   = 8,
            SUBACK      = 9,
            UNSUBSCRIBE = 10,
            UNSUBACK    = 11,
            PINGREQ     = 12,
            PINGRESP    = 13,
            DISCONNECT  = 14
        }
    
        /// <summary>
        /// 服务质量等级
        /// </summary>
        [Flags]
        public enum Qos : byte
        {
            /// <summary>
            ///     QOS Level 0 - Message is not guaranteed delivery. No retries are made to ensure delivery is successful.
            /// </summary>
            AtMostOnce = 0,
    
            /// <summary>
            ///     QOS Level 1 - Message is guaranteed delivery. It will be delivered at least one time, but may be delivered
            ///     more than once if network errors occur.
            /// </summary>
            AtLeastOnce = 1,
    
            /// <summary>
            ///     QOS Level 2 - Message will be delivered once, and only once. Message will be retried until
            ///     it is successfully sent..
            /// </summary>
            ExactlyOnce = 2,
        }

    ping 请求包:  PingReqMessage

    响应包:         PingRespMessage

        internal sealed class PingReqMessage : Message
        {
            public PingReqMessage()
                : base(MessageType.PINGREQ)
            {
            }
    
            public override void WriteTo(Stream stream)
            {
                FixedHeader.WriteTo(stream);
            }
        }
    
        internal class PingRespMessage : Message
        {
            public PingRespMessage()
                : base(MessageType.PINGRESP)
            {
            }
    
            public PingRespMessage(FixedHeader header, Stream stream)
            {
                FixedHeader = header;
            }
        }

     OK.

  • 相关阅读:
    [Web 前端] CSS 盒子模型,绝对定位和相对定位
    [Android Pro] 跨平台反编译工具 jadx (ubuntu亲测 可用)
    [Web 前端] Jquery 复制元素,并修改属性, 追加到另一个元素后面
    [Web 前端 ] Jquery attr()方法 获取或修改 对象的属性值
    [Web 前端] Jquery实现可直接编辑的表格
    [Web 前端] td长度固定,内容过长,超过部分用省略号代替
    [Network] okhttp3与旧版本okhttp的区别分析
    [Android Pro] AndroidStudio IDE界面插件开发(进阶篇之Editor)
    graph embedding 使用方法
    win10 'make' 不是内部或外部命令
  • 原文地址:https://www.cnblogs.com/linsongbin/p/4736227.html
Copyright © 2011-2022 走看看