zoukankan      html  css  js  c++  java
  • mqtt 协议之 PINGREQ, PINGRESP

      mqtt 协议里最简单的是 ping 协议吧 (心跳包), ping 协议是已连接的客户端发往服务端, 告诉服务端,我还"活着"

    PINGREQ - PING request

    fixed header format.

    bit76543210
    byte 1 Message Type (12) DUP flag QoS level RETAIN
      1 1 0 0 x x x x
    byte 2 Remaining Length (0)
      0 0 0 0 0 0 0 0

    no variable header

    no payload 

    response:   The response to a PINGREQ message is a PINGRESP message.

    PINGRESP - PING response

    fixed header

    bit76543210
    byte 1 Message Type (13) DUP flag QoS level RETAIN
      1 1 0 1 x x x x
    byte 2 Remaining Length (0)
      0 0 0 0 0 0 0 0

    no variable header

    no payload

    ------------------------------------------------------------------------ 华丽的分界线 ---------------------------------------

    客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。两个字节,固定值。

    服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。

    周期定义在 心跳频率在CONNECT(连接包)可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。

    ok ,上代码 :

    固定头部 FinedHeader

        /// <summary>
        /// Fixed header
        /// </summary>
        internal class FixedHeader
        {
            /// <summary>
            /// Message type
            /// </summary>
            public MessageType MessageType { get; set; }
    
            /// <summary>
            /// DUP flag
            /// </summary>
            public bool Dup { get; set; }
    
            /// <summary>
            /// QoS flags
            /// </summary>
            public Qos Qos { get; set; }
    
            /// <summary>
            /// RETAIN 保持
            /// </summary>
            public bool Retain { get; set; }
    
            /// <summary>
            /// Remaining Length 剩余长度
            /// 单个字节最大值:01111111,16进制:0x7F,10进制为127。
            /// MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。
            /// MQTT协议最多允许4个字节表示剩余长度。
            /// 最大长度为:0xFF,0xFF,0xFF,0x7F,
            /// 二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455
            /// </summary>
            public int RemaingLength { get; set; }
    
            public FixedHeader() { }
    
            public FixedHeader(Stream stream)
            {
                if (stream.Length < 2)
                    throw new Exception("The supplied header is invalid. Header must be at least 2 bytes long.");
    
                var byte1 = stream.ReadByte();
                MessageType = (MessageType)((byte1 & 0xf0) >> 4);
                Dup = ((byte1 & 0x08) >> 3) > 0;
                Qos = (Qos)((byte1 & 0x06) >> 1);
                Retain = (byte1 & 0x01) > 0;
    
                //Remaining Length
                //var byte2 = stream.ReadByte();
                var lengthBytes = ReadLengthBytes(stream);
                RemaingLength = CalculateLength(lengthBytes);
            }
    
            public void WriteTo(Stream stream)
            {
                var flags = (byte)MessageType << 4;
                flags |= (Dup ? 1 : 0) << 3;
                flags |= (byte)Qos << 1;
                flags |= Retain ? 1 : 0;
    
                stream.WriteByte((byte)flags);     //byte 1
                if (RemaingLength == 0)         //byte 2
                    stream.WriteByte(0);
                else
                {
                    do
                    {
                        int digit = RemaingLength & 0x7f;
                        RemaingLength = RemaingLength >> 7;
                        if (RemaingLength > 0)
                            digit = digit | 0x80;
                        stream.WriteByte((byte)digit);
                    } while (RemaingLength > 0);
                }
            }
    
            internal static byte[] ReadLengthBytes(Stream stream)
            {
                var lengthBytes = new List<byte>();
    
                // read until we've got the entire size, or the 4 byte limit is reached
                byte sizeByte;
                int byteCount = 0;
                do
                {
                    sizeByte = (byte)stream.ReadByte();
                    lengthBytes.Add(sizeByte);
                } while (++byteCount <= 4 && (sizeByte & 0x80) == 0x80);
    
                return lengthBytes.ToArray();
            }
    
            internal static int CalculateLength(byte[] lengthBytes)
            {
                var remainingLength = 0;
                var multiplier = 1;
    
                foreach (var currentByte in lengthBytes)
                {
                    remainingLength += (currentByte & 0x7f) * multiplier;
                    multiplier *= 0x80;
                }
    
                return remainingLength;
            }
        }

    消息父类: Message

        internal class Message
        {
            public FixedHeader FixedHeader { get; protected set; }
    
            public Message()
            {
            }
    
            public Message(MessageType messageType)
            {
                FixedHeader = new FixedHeader
                {
                    MessageType = messageType
                };
            }
    
            public virtual void WriteTo(Stream stream)
            {
            }
    
            public static Message CreateFrom(byte[] buffer)
            {
                using (var stream = new MemoryStream(buffer))
                {
                    return CreateFrom(stream);
                }
            }
    
            public static Message CreateFrom(Stream stream)
            {
                var header = new FixedHeader(stream);
                return CreateMessage(header, stream);
            }
    
            public static Message CreateMessage(FixedHeader header, Stream stream)
            {
                switch (header.MessageType)
                {
                    case MessageType.CONNACK:
                        return new ConnAckMessage(header, stream);
                    case MessageType.DISCONNECT:
                        return null;
                    case MessageType.PINGREQ:
                        return new PingReqMessage();
                    case MessageType.PUBACK:
                        return new PublishAckMessage(header, stream);
                    case MessageType.PUBCOMP:
                        //return new MqttPubcompMessage(str, header);
                    case MessageType.PUBLISH:
                        //return new MqttPublishMessage(str, header);
                    case MessageType.PUBREC:
                        //return new MqttPubrecMessage(str, header);
                    case MessageType.PUBREL:
                        //return new MqttPubrelMessage(str, header);
                    case MessageType.SUBACK:
                        //return new MqttSubackMessage(str, header);
                    case MessageType.UNSUBACK:
                        //return new MqttUnsubackMessage(str, header);
                    case MessageType.PINGRESP:
                        return new PingRespMessage(header, stream);
                    case MessageType.UNSUBSCRIBE:
                    case MessageType.CONNECT:
                    case MessageType.SUBSCRIBE:
                    default:
                        throw new Exception("Unsupported Message Type");
                }
            }
        }

    两个枚举:

    MessageType  (消息类型)

    Qos (服务质量等级)

        [Flags]
        public enum MessageType : byte
        {
            CONNECT     = 1,
            CONNACK     = 2,
            PUBLISH     = 3,
            PUBACK      = 4,
            PUBREC      = 5,
            PUBREL      = 6,
            PUBCOMP     = 7,
            SUBSCRIBE   = 8,
            SUBACK      = 9,
            UNSUBSCRIBE = 10,
            UNSUBACK    = 11,
            PINGREQ     = 12,
            PINGRESP    = 13,
            DISCONNECT  = 14
        }
    
        /// <summary>
        /// 服务质量等级
        /// </summary>
        [Flags]
        public enum Qos : byte
        {
            /// <summary>
            ///     QOS Level 0 - Message is not guaranteed delivery. No retries are made to ensure delivery is successful.
            /// </summary>
            AtMostOnce = 0,
    
            /// <summary>
            ///     QOS Level 1 - Message is guaranteed delivery. It will be delivered at least one time, but may be delivered
            ///     more than once if network errors occur.
            /// </summary>
            AtLeastOnce = 1,
    
            /// <summary>
            ///     QOS Level 2 - Message will be delivered once, and only once. Message will be retried until
            ///     it is successfully sent..
            /// </summary>
            ExactlyOnce = 2,
        }

    ping 请求包:  PingReqMessage

    响应包:         PingRespMessage

        internal sealed class PingReqMessage : Message
        {
            public PingReqMessage()
                : base(MessageType.PINGREQ)
            {
            }
    
            public override void WriteTo(Stream stream)
            {
                FixedHeader.WriteTo(stream);
            }
        }
    
        internal class PingRespMessage : Message
        {
            public PingRespMessage()
                : base(MessageType.PINGRESP)
            {
            }
    
            public PingRespMessage(FixedHeader header, Stream stream)
            {
                FixedHeader = header;
            }
        }

     OK.

  • 相关阅读:
    DEDECMS里面DEDE函数解析
    dede数据库类使用方法 $dsql
    DEDE数据库语句 DEDESQL命令批量替换 SQL执行语句
    织梦DedeCms网站更换域名后文章图片路径批量修改
    DSP using MATLAB 示例 Example3.12
    DSP using MATLAB 示例 Example3.11
    DSP using MATLAB 示例 Example3.10
    DSP using MATLAB 示例Example3.9
    DSP using MATLAB 示例Example3.8
    DSP using MATLAB 示例Example3.7
  • 原文地址:https://www.cnblogs.com/linsongbin/p/4736227.html
Copyright © 2011-2022 走看看