zoukankan      html  css  js  c++  java
  • 从 LengthFieldBasedFrameDecoder 看 netty 处理拆包

    LengthFieldBasedFrameDecoder 继承自 ByteToMessageDecoder

    public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter

    ByteToMessageDecoder 本身是一个 ChannelInboundHandler

    ByteToMessageDecoder 中有 2 种数据积累器,一种拷贝式,一种组合式,默认使用拷贝式,组合式更省内存,更复杂,会慢点。

    // io.netty.handler.codec.ByteToMessageDecoder#cumulator
    private Cumulator cumulator = MERGE_CUMULATOR;
    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            try {
                final int required = in.readableBytes();
                // 需要扩容
                if (required > cumulation.maxWritableBytes() ||
                        (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
                        cumulation.isReadOnly()) {
                    // Expand cumulation (by replacing it) under the following conditions:
                    // - cumulation cannot be resized to accommodate the additional data
                    // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
                    //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
                    return expandCumulation(alloc, cumulation, in);
                }
                // 把新读到的数据写入积累器
                return cumulation.writeBytes(in);
            } finally {
                // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
                // for whatever release (for example because of OutOfMemoryError)
                in.release();
            }
        }
    };

    入口在

    // io.netty.handler.codec.ByteToMessageDecoder#channelRead
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            // 从对象池获取 CodecOutputList
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                // 从 socket 读到的数据
                ByteBuf data = (ByteBuf) msg;
                // 是否第一次接收数据
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    // 数据积累器是拷贝式
                    // 如果上一次由于拆包,没有解析出消息,则数据会存留在积累器中
                    // 积累器接收新的数据
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 对积累器的数据进行解码
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }
    
                int size = out.size();
                firedChannelRead |= out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    解码

    // io.netty.handler.codec.ByteToMessageDecoder#callDecode
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                // 已解码的消息个数
                int outSize = out.size();
    
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
    
                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
    
                int oldInputLength = in.readableBytes();
                // 调用子类的 decode 方法
                decodeRemovalReentryProtection(ctx, in, out);
    
                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }
                // 前后的消息个数相等,表示没有解码新的消息
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        // 拆包情况,走这个分支,跳出循环,读取更多的数据
                        break;
                    } else {
                        continue;
                    }
                }
    
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }
    
                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }
    // io.netty.handler.codec.LengthFieldBasedFrameDecoder#decode
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }
    
    
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (discardingTooLongFrame) {
            discardingTooLongFrame(in);
        }
    
        if (in.readableBytes() < lengthFieldEndOffset) {
            return null;
        }
    
        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
        // 读取报文的长度字段,解析出报文长度
        // ByteBuf.getUnsignedByte 并不会移动 readerIndex
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
    
        if (frameLength < 0) {
            failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
        }
    
        frameLength += lengthAdjustment + lengthFieldEndOffset;
    
        if (frameLength < lengthFieldEndOffset) {
            failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
        }
    
        if (frameLength > maxFrameLength) {
            exceededFrameLength(in, frameLength);
            return null;
        }
    
        // never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        // 拆包情况,读到的数据小于报文的长度,无法解析,需要继续接收数据,进行第二次解析
        if (in.readableBytes() < frameLengthInt) {
            return null;
        }
    
        if (initialBytesToStrip > frameLengthInt) {
            failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
        }
        in.skipBytes(initialBytesToStrip);
    
        // extract frame
        int readerIndex = in.readerIndex();
        int actualFrameLength = frameLengthInt - initialBytesToStrip;
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }

    处理拆包的流程:
    1. 从 channel 读取的数据会先放到数据积累器中
    2. 使用解码器对数据进行解码,拆包情况则是接收的数据小于报文的实际长度,因此解码失败
    3. 重新从 channel 读取消息,放入积累器中
    4. 再次解码消息,若成功则传播到 pipeline 中,失败则继续接收消息

  • 相关阅读:
    C#判断一个字符串是否是数字或者含有某个数字
    SQL多字段排序
    对于过长字符串的大小比对
    WebFrom页面绑定数据过于冗长的处理方法
    webform的导出
    SQL数据库Truncate的相关用法
    SQL的CharIndex用法
    近期总结
    每周一水(4-1)
    Codeforces Round #238 (Div. 2) 解题报告
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12167939.html
Copyright © 2011-2022 走看看