mqtt 协议里最简单的是 ping 协议吧 (心跳包), ping 协议是已连接的客户端发往服务端, 告诉服务端,我还"活着"
PINGREQ - PING request
fixed header format.
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Type (12) | DUP flag | QoS level | RETAIN | ||||
1 | 1 | 0 | 0 | x | x | x | x | |
byte 2 | Remaining Length (0) | |||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
no variable header
no payload
response: The response to a PINGREQ message is a PINGRESP message.
PINGRESP - PING response
fixed header
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Type (13) | DUP flag | QoS level | RETAIN | ||||
1 | 1 | 0 | 1 | x | x | x | x | |
byte 2 | Remaining Length (0) | |||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
no variable header
no payload
------------------------------------------------------------------------ 华丽的分界线 ---------------------------------------
客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。两个字节,固定值。
服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。
周期定义在 心跳频率在CONNECT(连接包)可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。
ok ,上代码 :
固定头部 FinedHeader
/// <summary> /// Fixed header /// </summary> internal class FixedHeader { /// <summary> /// Message type /// </summary> public MessageType MessageType { get; set; } /// <summary> /// DUP flag /// </summary> public bool Dup { get; set; } /// <summary> /// QoS flags /// </summary> public Qos Qos { get; set; } /// <summary> /// RETAIN 保持 /// </summary> public bool Retain { get; set; } /// <summary> /// Remaining Length 剩余长度 /// 单个字节最大值:01111111,16进制:0x7F,10进制为127。 /// MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。 /// MQTT协议最多允许4个字节表示剩余长度。 /// 最大长度为:0xFF,0xFF,0xFF,0x7F, /// 二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455 /// </summary> public int RemaingLength { get; set; } public FixedHeader() { } public FixedHeader(Stream stream) { if (stream.Length < 2) throw new Exception("The supplied header is invalid. Header must be at least 2 bytes long."); var byte1 = stream.ReadByte(); MessageType = (MessageType)((byte1 & 0xf0) >> 4); Dup = ((byte1 & 0x08) >> 3) > 0; Qos = (Qos)((byte1 & 0x06) >> 1); Retain = (byte1 & 0x01) > 0; //Remaining Length //var byte2 = stream.ReadByte(); var lengthBytes = ReadLengthBytes(stream); RemaingLength = CalculateLength(lengthBytes); } public void WriteTo(Stream stream) { var flags = (byte)MessageType << 4; flags |= (Dup ? 1 : 0) << 3; flags |= (byte)Qos << 1; flags |= Retain ? 1 : 0; stream.WriteByte((byte)flags); //byte 1 if (RemaingLength == 0) //byte 2 stream.WriteByte(0); else { do { int digit = RemaingLength & 0x7f; RemaingLength = RemaingLength >> 7; if (RemaingLength > 0) digit = digit | 0x80; stream.WriteByte((byte)digit); } while (RemaingLength > 0); } } internal static byte[] ReadLengthBytes(Stream stream) { var lengthBytes = new List<byte>(); // read until we've got the entire size, or the 4 byte limit is reached byte sizeByte; int byteCount = 0; do { sizeByte = (byte)stream.ReadByte(); lengthBytes.Add(sizeByte); } while (++byteCount <= 4 && (sizeByte & 0x80) == 0x80); return lengthBytes.ToArray(); } internal static int CalculateLength(byte[] lengthBytes) { var remainingLength = 0; var multiplier = 1; foreach (var currentByte in lengthBytes) { remainingLength += (currentByte & 0x7f) * multiplier; multiplier *= 0x80; } return remainingLength; } }
消息父类: Message
internal class Message { public FixedHeader FixedHeader { get; protected set; } public Message() { } public Message(MessageType messageType) { FixedHeader = new FixedHeader { MessageType = messageType }; } public virtual void WriteTo(Stream stream) { } public static Message CreateFrom(byte[] buffer) { using (var stream = new MemoryStream(buffer)) { return CreateFrom(stream); } } public static Message CreateFrom(Stream stream) { var header = new FixedHeader(stream); return CreateMessage(header, stream); } public static Message CreateMessage(FixedHeader header, Stream stream) { switch (header.MessageType) { case MessageType.CONNACK: return new ConnAckMessage(header, stream); case MessageType.DISCONNECT: return null; case MessageType.PINGREQ: return new PingReqMessage(); case MessageType.PUBACK: return new PublishAckMessage(header, stream); case MessageType.PUBCOMP: //return new MqttPubcompMessage(str, header); case MessageType.PUBLISH: //return new MqttPublishMessage(str, header); case MessageType.PUBREC: //return new MqttPubrecMessage(str, header); case MessageType.PUBREL: //return new MqttPubrelMessage(str, header); case MessageType.SUBACK: //return new MqttSubackMessage(str, header); case MessageType.UNSUBACK: //return new MqttUnsubackMessage(str, header); case MessageType.PINGRESP: return new PingRespMessage(header, stream); case MessageType.UNSUBSCRIBE: case MessageType.CONNECT: case MessageType.SUBSCRIBE: default: throw new Exception("Unsupported Message Type"); } } }
两个枚举:
MessageType (消息类型)
Qos (服务质量等级)
[Flags] public enum MessageType : byte { CONNECT = 1, CONNACK = 2, PUBLISH = 3, PUBACK = 4, PUBREC = 5, PUBREL = 6, PUBCOMP = 7, SUBSCRIBE = 8, SUBACK = 9, UNSUBSCRIBE = 10, UNSUBACK = 11, PINGREQ = 12, PINGRESP = 13, DISCONNECT = 14 } /// <summary> /// 服务质量等级 /// </summary> [Flags] public enum Qos : byte { /// <summary> /// QOS Level 0 - Message is not guaranteed delivery. No retries are made to ensure delivery is successful. /// </summary> AtMostOnce = 0, /// <summary> /// QOS Level 1 - Message is guaranteed delivery. It will be delivered at least one time, but may be delivered /// more than once if network errors occur. /// </summary> AtLeastOnce = 1, /// <summary> /// QOS Level 2 - Message will be delivered once, and only once. Message will be retried until /// it is successfully sent.. /// </summary> ExactlyOnce = 2, }
ping 请求包: PingReqMessage
响应包: PingRespMessage
internal sealed class PingReqMessage : Message { public PingReqMessage() : base(MessageType.PINGREQ) { } public override void WriteTo(Stream stream) { FixedHeader.WriteTo(stream); } } internal class PingRespMessage : Message { public PingRespMessage() : base(MessageType.PINGRESP) { } public PingRespMessage(FixedHeader header, Stream stream) { FixedHeader = header; } }
OK.