zoukankan      html  css  js  c++  java
  • [编织消息框架][设计协议]解决粘包半包(下)

    接下来介绍netty如何切割分包

    学习目的,了解处理业务,方便以后脱离依赖

    读者如果不感兴趣或看不懂可以先忽略,难度比较大

    LengthFieldBasedFrameDecoder.class

        public LengthFieldBasedFrameDecoder(
                ByteOrder byteOrder,     //大小端模式 默认大端 ByteOrder BIG_ENDIAN
                int maxFrameLength,      //包Frame netty叫帧概念 最大上限
                int lengthFieldOffset,     //包长度信息偏移多少bytes
                int lengthFieldLength,    //包长度单位为bytes
                int lengthAdjustment,     //包附加信息占多少位,如包尾部checksum 也可以不用设置
                int initialBytesToStrip,//忽悠包头信息,返给上层时会去掉这部份bytes
                boolean failFast)        //解包出错,控制抛异常,默认为true 无需关心这选项

    如示例

    int maxFrameLength = Short.MAX_VALUE;
    int lengthFieldOffset =1;
    int lengthFieldLength =2;
    new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN,maxFrameLength,lengthFieldOffset,lengthFieldLength,0,0,true);

    如图:

    数据包由低到高是从左到右

    红色部份lengthFieldOffset 1byte=8bits 

    蓝色部份lengthFieldLength 2byte=16bits = short

    粉色部份messageLength 2byte 读取 蓝色部份由于大端模式 高位 0000 0010 等于2 

     图2

    消息长度为1byte 

    解读netty源码

    分四部份

    1.netty解码介绍

    2.边界判断

    3.计算逻辑

    4.切割包

    第一部份

    先看下LengthFieldBasedFrameDecoder继承类之间的关系

    ByteToMessageDecoder处理比较复杂,先不考虑

    其实解码只要工作方法是

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
    //公开调用方法 out 是解码后保存返回,为什么是个数组?原因有可能出现粘包情况多次解码,合并结果一次返回上层业务
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        //调用实现解码方法
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    提示:如果自己实现解码 继承ByteToMessageDecoder类,逻辑写在decode(ChannelHandlerContext ctx, ByteBuf in)方法即可

    第三部份

     1     //计算包长度偏移坐标 已读坐标+lengthFieldOffset参数
     2     int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
     3     //算出包长度
     4     long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
     5     
     6     //包实际长度  加上开始忽悠的 lengthFieldEndOffset参数 加上 lengthAdjustment 参数
     7     frameLength += lengthAdjustment + lengthFieldEndOffset;
     8     
     9     //跳过忽悠头部 initialBytesToStrip参数
    10     in.skipBytes(initialBytesToStrip);
    11         
    12     //length 单位是byte 如2 是占一个short长度 4占int长度 8占long长度
    13     protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
    14         //转换大小端模式
    15         buf = buf.order(order);
    16         long frameLength;
    17         switch (length) {
    18         case 1:
    19             frameLength = buf.getUnsignedByte(offset);
    20             break;
    21         case 2:
    22             frameLength = buf.getUnsignedShort(offset);
    23             break;
    24         case 3:
    25             frameLength = buf.getUnsignedMedium(offset);
    26             break;
    27         case 4:
    28             frameLength = buf.getUnsignedInt(offset);
    29             break;
    30         case 8:
    31             frameLength = buf.getLong(offset);
    32             break;
    33         default:
    34             throw new DecoderException(
    35                     "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
    36         }
    37         return frameLength;
    38     }
    计算逻辑

    第四部份

     1 //切割实现,直接调用ByteBuf 方法切割
     2         /**
     3      * Extract the sub-region of the specified buffer.
     4      * <p>
     5      * If you are sure that the frame and its content are not accessed after
     6      * the current {@link #decode(ChannelHandlerContext, ByteBuf)}
     7      * call returns, you can even avoid memory copy by returning the sliced
     8      * sub-region (i.e. <tt>return buffer.slice(index, length)</tt>).
     9      **/
    10     protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
    11         return buffer.retainedSlice(index, length);
    12     }
    13     
    14     AbstractUnpooledSlicedByteBuf.class
    15     //深追ByteBuf切割实现,发现返回的ByteBuf只是保留引用,只是简单记录了下offsetStart adjustment writerIndex
    16       AbstractUnpooledSlicedByteBuf(ByteBuf buffer, int index, int length) {
    17         super(length);
    18         checkSliceOutOfBounds(index, length, buffer);
    19 
    20         if (buffer instanceof AbstractUnpooledSlicedByteBuf) {
    21             this.buffer = ((AbstractUnpooledSlicedByteBuf) buffer).buffer;
    22             adjustment = ((AbstractUnpooledSlicedByteBuf) buffer).adjustment + index;
    23         } else if (buffer instanceof DuplicatedByteBuf) {
    24             this.buffer = buffer.unwrap();
    25             adjustment = index;
    26         } else {
    27             this.buffer = buffer;
    28             adjustment = index;
    29         }
    30 
    31         initLength(length);
    32         writerIndex(length);
    33     }
    切割包

    第二部份

    现在回过头来看边界判断,主要是看处理出错逻辑部份

     1 long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
     2         //解码出错
     3         if (frameLength < 0) {
     4             //无法计算包内容时netty策略选择跳过 包头信息跟包长度大小
     5             in.skipBytes(lengthFieldEndOffset);
     6             throw new CorruptedFrameException(
     7                     "negative pre-adjustment length field: " + frameLength);
     8         }
     9 
    10          frameLength += lengthAdjustment + lengthFieldEndOffset;
    11         //奇怪的逻辑,上面已加上 lengthFieldEndOffset 除非 lengthAdjustment 参数或 lengthFieldEndOffset参数设置为负值
    12         //一般没什么复杂情况最好不要设置负值
    13         if (frameLength < lengthFieldEndOffset) {
    14             in.skipBytes(lengthFieldEndOffset);
    15             throw new CorruptedFrameException(
    16                     "Adjusted frame length (" + frameLength + ") is less " +
    17                     "than lengthFieldEndOffset: " + lengthFieldEndOffset);
    18         }
    19         
    20         // never overflows because it's less than maxFrameLength
    21         int frameLengthInt = (int) frameLength;
    22         //缓冲剩余可读bytes少于包长度,说明出现半包情况,忽略处理
    23         if (in.readableBytes() < frameLengthInt) {
    24             return null;
    25         }
    26         //如果包长度少于 initialBytesToStrip 参数会抛异常并且跳过包内容
    27         //没什么特殊情况最好不要设置initialBytesToStrip参数
    28         if (initialBytesToStrip > frameLengthInt) {
    29             in.skipBytes(frameLengthInt);
    30             throw new CorruptedFrameException(
    31                     "Adjusted frame length (" + frameLength + ") is less " +
    32                     "than initialBytesToStrip: " + initialBytesToStrip);
    33         }
    基本处理
     1         //包长度大于 maxFrameLength
     2         //看下netty处理策略
     3         if (frameLength > maxFrameLength) {
     4             //算出超出范围
     5             long discard = frameLength - in.readableBytes();
     6             //记录包长度
     7             tooLongFrameLength = frameLength;
     8 
     9             //有两种情况处理
    10             
    11             //存在粘包情况,直接 in.skipBytes((int) frameLength);跳过当前包 什么也不用记录
    12             if (discard < 0) {
    13                 // buffer contains more bytes then the frameLength so we can discard all now
    14                 in.skipBytes((int) frameLength);
    15             } else {
    16                 //当 discard>=0 
    17                 //说明没存在粘包情况,有可能出现半包情况,in.skipBytes(in.readableBytes()); 清空所有接收数据
    18                 //设置discardingTooLongFrame,bytesToDiscard记录超出范围,下次解码时用上
    19                 // Enter the discard mode and discard everything received so far.
    20                 discardingTooLongFrame = true;
    21                 bytesToDiscard = discard;
    22                 in.skipBytes(in.readableBytes());
    23             }
    24             failIfNecessary(true);
    25             return null;
    26         }
    27         
    28         //当上次包出现超过最大包限制,要忽悠上个包半包情况
    29         if (discardingTooLongFrame) {
    30             long bytesToDiscard = this.bytesToDiscard;
    31             //剩余半包长度
    32             int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
    33             //忽略剩余半包长度
    34             in.skipBytes(localBytesToDiscard);
    35             //记录忽略已忽略多少bytes
    36             bytesToDiscard -= localBytesToDiscard;
    37             this.bytesToDiscard = bytesToDiscard;
    38             //如果this.bytesToDiscard==0 说明已全部忽略上一个包 会重置 discardingTooLongFrame
    39             failIfNecessary(false);
    40         }
    41         
    42     private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
    43         if (bytesToDiscard == 0) {
    44             // Reset to the initial state and tell the handlers that
    45             // the frame was too large.
    46             long tooLongFrameLength = this.tooLongFrameLength;
    47             this.tooLongFrameLength = 0;
    48             discardingTooLongFrame = false;
    49             if (!failFast ||
    50                 failFast && firstDetectionOfTooLongFrame) {
    51                 fail(tooLongFrameLength);
    52             }
    53         } else {
    54             // Keep discarding and notify handlers if necessary.
    55             if (failFast && firstDetectionOfTooLongFrame) {
    56                 fail(tooLongFrameLength);
    57             }
    58         }
    59     }
    大于maxFrameLength策略

    当大于maxFrameLength 处理核心忽略当前包,如果存在粘包直接忽略,如果存在半包记录剩余包大小bytesToDiscard,下次循环解码时不停跳过bytesToDiscard

    关于分包处理

    为何需要分包?比如要传送10G大小的文件,如果把数据全读进内存再发送出去,明显是不合理的,这时要把10G大小的数据切割分批读,分批发送出去

    为什么不做在应用协议里?因业太多业务上的逻辑,底层是无法控制的,再说是特定的功能,没必要添加支持,看作成业务上一个小功能实现就行了

  • 相关阅读:
    取消PHPCMS V9后台新版本升级提示信息
    phpcmsv9全站搜索,不限模型
    jq瀑布流代码
    phpcms v9模版调用代码
    angular.js添加自定义服务依赖项方法
    angular多页面切换传递参数
    angular路由最基本的实例---简单易懂
    作用域事件传播
    利用angular控制元素的显示和隐藏
    利用angular给节点添加样式
  • 原文地址:https://www.cnblogs.com/solq111/p/6542239.html
Copyright © 2011-2022 走看看