一、概要
本章主要内容就是讲解如何在dotnetty的框架中进行网络通讯以及编解码对象、数据包分包拆包的相关知识点。
后续会专门开一篇避坑的文章,主要会描述在使用dotnetty的框架时会遇到的哪些问题帮助各位开发者在使用过程当中出现问题,会不断的收集问题不断的更新肯定是附带问题的解决方案的。
希望有兴趣的小伙伴可以提供相关的“坑”一起更新一起解决困难,让dotnetty的框架更容易使用。
二、简介
1.什么是编码、解码
2.解码器Decoder讲解
3.编码器Encoder讲解
4.编解码器类Codec讲解
5.网络传输TCP粘包拆包
6.核心模块缓冲ByteBuffer
7.实战环节
8.Dotnetty所用到的设计模式
三、详细内容
1.什么是编码、解码
前面说的:高性能RPC框架的3个要素:IO模型、数据协议、线程模型
最开始接触的编码码:序列化/反序列化(就是编解码)、url编码、base64编解码
业界里面也有其他编码框架: google的 protobuf(PB)、Facebook的Trift、json等
DotNetty里面的编解码:
解码器:负责处理入站 InboundHandler”数据
编码器:负责出站 OutboundHandler” 数据
DotNetty里面提供默认的编解码器,也支持自定义编解码器
Encoder:编码器
Decoder:解码器
Codec:编解码器
2.解码器Decoder讲解
Decoder对应的就是ChannelInboundHandler,主要就是字节数组转换为消息对象
主要是两个方法 decode decodeLast
抽象解码器
- ByteToMessageDecoder用于将字节转为消息,需要检查缓冲区是否有足够的字节
- ReplayingDecoder继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是ReplayingDecoder速度略满于ByteToMessageDecoder,不是所有的ByteBuf都支持。
- 选择:项目复杂性高则使用ReplayingDecoder,否则使用 ByteToMessageDecoder
- MessageToMessageDecoder用于从一种消息解码为另外一种消息(例如POJO到POJO)
解码器具体的实现,用的比较多的是(更多是为了解决TCP底层的粘包和拆包问题)
- DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
- LineBasedFrameDecoder: 以换行符为结束标志的解码器
- FixedLengthFrameDecoder:固定长度解码器
- LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器
- StringDecoder:文本解码器,将接收到的对象转化为字符串,一般会与上面的进行配合,然后在后面添加业务handle
3.编码器Encoder讲解
Encoder对应的就是ChannelOutboundHandler,消息对象转换为字节数组
Netty本身未提供和解码一样的编码器,是因为场景不同,两者非对等的
- MessageToByteEncoder消息转为字节数组,调用write方法,会先判断当前编码器是否支持需要发送的消息类
型,如果不支持,则透传;
- MessageToMessageEncoder用于从一种消息编码为另外一种消息
4.编解码器类Codec讲解
组合解码器和编码器,以此提供对于字节和消息都相同的操作
优点:成对出现,编解码都是在一个类里面完成
缺点:耦合在一起,拓展性不佳
Codec:组合编解码
1)ByteToMessageCodec
2)MessageToMessageCodec
decoder:解码
1)ByteToMessageDecoder
2)MessageToMessageDecoder
encoder:编码
1)ByteToMessageEncoder
2)MessageToMessageEncoder
5.网络传输TCP粘包拆包
- TCP拆包: 一个完整的包可能会被TCP拆分为多个包进行发送
- TCP粘包: 把多个小的包封装成一个大的数据包发送, client发送的若干数据包 Server接收时粘成一包发送方和接收方都可能出现这个原因
- 发送方的原因:TCP默认会使用Nagle算法
- 接收方的原因: TCP接收到数据放置缓存中,应用程序从缓存中读取
- UDP: 是没有粘包和拆包的问题,有边界协议
应用层解决半包读写的办法:
1.设置定长消息 (10字符)
123456789 123456789 123456789 123456789
2.设置消息的边界 ( | | 切割)
123456789||123456789||123456789||
3.使用带消息头的协议,消息头存储消息开始标识及消息的长度信息
DelimiterBasedFrameDecoder: 指定消息分隔符的解码器 LineBasedFrameDecoder: 以换行符为结束标志的解码器
FixedLengthFrameDecoder:固定长度解码器 LengthFieldBasedFrameDecoder:message = header+body, 基于长
度解码的通用解码器。
使用解码器LineBasedFrameDecoder解决半包读写
1)LineBaseFrameDecoder 以换行符为结束标志的解码器 ,构造函数里面的数字表示最长遍历的帧数
2)StringDecoder解码器将对象转成字符串。
- 自定义分隔符解决TCP读写
MaxLength:表示一行最大的长度,如果超过这个长度依然没有检测自定义分隔符,将会抛出
TooLongFrameException
FailFast:如果为true,则超出maxLength后立即抛出TooLongFrameException,不进行继续解码.如果为
False,则等到完整的消息被解码后,再抛出TooLongFrameException异常
StripDelimiter:解码后的消息是否去除掉分隔符
Delimiters:分隔符,ByteBuf类型
- 自定义长度半包读写器LengthFieldBasedFrameDecoder
MaxFrameLength 数据包的最大长度
LengthFieldOffset 长度字段的偏移位,长度字段开始的地方,意思是跳过指定长度个字节之后的才是消息体字段
LengthFieldLength 长度字段占的字节数, 帧数据长度的字段本身的长度
LengthAdjustment
一般 Header + Body,添加到长度字段的补偿值,如果为负数,开发人员认为这个 Header的长度字段是整个消息
包的长度,则Netty应该减去对应的数字
InitialBytesToStrip 从解码帧中第一次去除的字节数, 获取完一个完整的数据包之后,忽略前面的指定位数的长度字节,
应用解码器拿到的就是不带长度域的数据包
6.核心模块缓冲ByteBuffer
ByteBuf:传递字节数据的容器
ByteBuf的创建方法
1)ByteBufAllocator
池化( PooledByteBufAllocator提高性能并且最大程度减少内存碎片
非池化UnpooledByteBufAllocator: 每次返回新的实例
2)Unpooled: 提供静态方法创建未池化的ByteBuf,可以创建堆内存和直接内存缓冲区
ByteBuf使用模式
堆缓存区HEAP BUFFER:
优点:存储在的堆空间中,可以快速的分配和释放
缺点:每次使用前会拷贝到直接缓存区(也叫堆外内存)
直接缓存区DIRECR BUFFER:
优点:存储在堆外内存上,堆外分配的直接内存,不会占用堆空间
缺点:内存的分配和释放,比在堆缓冲区更复杂
复合缓冲区COMPOSITE BUFFER:
可以创建多个不同的ByteBuf,然后放在一起,但是只是一个视图
选择:大量IO数据读写,用“直接缓存区”; 业务消息编解码用“堆缓存区”
四、实战环节
实战环节使用的编解码器是
- ByteToMessageDecoder
- MessageToByteEncoder
数据包结构定义(https://www.cnblogs.com/justzhuzhu/p/12129328.html)之前已经在其他文章里写过了,所以这里直接开始编解码的操作。
解码
/// <summary> /// Decoder Packet /// </summary> public class DecoderHandler : ByteToMessageDecoder { private readonly PacketParser packetParser = new PacketParser(); protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) { var outputBufferList = new List<byte[]>(); var resultByte = new byte[input.ReadableBytes]; input.ReadBytes(resultByte); packetParser.TryParsing(ref resultByte,ref outputBufferList); output.AddRange(outputBufferList); input.Clear(); } }
编码
/// <summary> /// Encoder Packet /// </summary> public class EncoderHandler : MessageToByteEncoder<RpcResponse<Gimind.Infrastructure.Common.Packet.IMessage>> { protected override void Encode(IChannelHandlerContext context, RpcResponse<Gimind.Infrastructure.Common.Packet.IMessage> message, IByteBuffer output) { var arry = SerializePacket(message.Length, message.Header, message.Body); output.WriteBytes(arry); } public byte[] SerializeHeader(RespHeader header) { header.Checkbit = Header.Checkbit; var headerArry = new byte[25]; try { BytesWriter.Write(header.Checkbit, ref headerArry, 0); //4 BytesWriter.Write(header.RequestId, ref headerArry, 4); //8 BytesWriter.Write(header.Code, ref headerArry, 12); //4 BytesWriter.Write(header.IsEncrypt, ref headerArry, 16);//1 BytesWriter.Write(header.CommandId, ref headerArry, 17);//4 BytesWriter.Write(header.Ext1, ref headerArry, 21); //4 } catch (Exception ex) { NLogger.Error("SerializeHeader",ex.Message,ex); } return headerArry; } private byte[] SerializePacket(int length,RespHeader header,IMessage body) { try { var Header = SerializeHeader(header); length += Header.Length; byte[] Body = null; var protobytes = SerializerUtilitys.Serialize(body); if (protobytes != null) { Body = protobytes; length += Body.Length; } var packageArry = new byte[4 + length]; BytesWriter.Write(length, ref packageArry, 0); BytesWriter.Write(Header, ref packageArry, 4); if (body != null) { BytesWriter.Write(Body, ref packageArry, 4 + RespHeader.Length); } return packageArry; } catch (Exception ex) { NLogger.Error("SerializeHeader", ex.Message, ex); } return null; } }
分包拆包逻辑
public class PacketParser { private readonly List<byte[]> _bufferList = new List<byte[]>(); public void TryParsing(ref byte[] inBytes, ref List<byte[]> outBytes) { try { _bufferList.Add(inBytes); var tempBuffer = new byte[_bufferList.Sum(item => item.Length)]; var size = 0; foreach (var item in _bufferList) { item.CopyTo(tempBuffer, size); size += item.Length; } if (tempBuffer.Length < 4) return; var packetLen = BytesReader.ReadInt32(ref tempBuffer, 0); if (tempBuffer.Length < (4 + packetLen)) { return; } if (tempBuffer.Length == (4 + packetLen)) { _bufferList.Clear(); outBytes.Add(tempBuffer); } if (tempBuffer.Length > (4 + packetLen)) { var left = new byte[4 + packetLen]; Array.Copy(tempBuffer, 0, left, 0, left.Length); var right = new byte[tempBuffer.Length - left.Length]; Array.Copy(tempBuffer, left.Length, right, 0, right.Length); _bufferList.Clear(); outBytes.Add(left); TryParsing(ref right, ref outBytes); } } catch (Exception ex) { NLogger.Error("PacketParser Error", ex.Message, ex); } } }
Protobuffer
using ProtoBuf; using System; using System.IO; namespace Protobuffer.Utilities { public class SerializerUtilitys { /// <summary> /// 序列化 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="serializeObj">序列化对象</param> /// <returns></returns> public static byte[] Serialize<T>(T serializeObj) { try { using (var stream = new MemoryStream()) { ProtoBuf.Serializer.Serialize<T>(stream, serializeObj); var result = new byte[stream.Length]; stream.Position = 0L; stream.Read(result, 0, result.Length); return result; } } catch (Exception e) { return null; } } /// <summary> /// 反序列化 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="bytes">二进制对象数组</param> /// <returns></returns> public static T DeSerialize<T>(byte[] bytes) { try { using (var stream = new MemoryStream()) { stream.Write(bytes, 0, bytes.Length); stream.Position = 0L; return ProtoBuf.Serializer.Deserialize<T>(stream); } } catch (Exception e) { Console.WriteLine(e); return default(T); } } } }
如果你看到这里,可能会有意外的收获。在DotNetty里面的应用里用到如下设计模式:
- Builder构造器模式:ServerBootstap
- 责任链设计模式:pipeline的事件传播
- 工厂模式: 创建Channel
- 适配器模式:HandlerAdapter
- 推荐书籍:《大话设计模式》《Head First设计模式》《CLR VIA C#》《大型网站技术架构 核心原理与案例分析》《.net 框架设计》《.net 性能优化》《编写高性能的.net代码》
希望大家多多支持。不胜感激。
- E-Mail:zhuzhen723723@outlook.com
- QQ: 580749909(个人群)
- Blog: https://www.cnblogs.com/justzhuzhu/
- Git: https://github.com/JusterZhu
- 微信公众号