zoukankan      html  css  js  c++  java
  • 高性能NIO通信框架之Netty(4)ChannelHandler分析

    一.ChannelHandler的功能说明

       ChannelHandler类似ServletFilter过滤器,负责对I/O事件或者I/O操作进行拦截和处理,它可以选择性地拦截和处理自己感兴趣的事件,也可以透传和终止事件的传递。

    ChannelHandler支持注解,目前就支持两种注解:

    1@Sharable :多个ChannelPipeline公用一个ChannelHandler

    2@Skip:被Skip注解的方法不会被调用,直接被忽略

        @Target({ElementType.METHOD})
        @Retention(RetentionPolicy.RUNTIME)
        public @interface Skip {
        }
    
        @Inherited
        @Documented
        @Target({ElementType.TYPE})
        @Retention(RetentionPolicy.RUNTIME)
        public @interface Sharable {
        }

     

       大多数的ChannelHandler会选择性地拦截和处理某个或者某些事件,其他的事件会忽略,由下一个ChannelHandler进行拦截和处理。这会导致另一个问题:用户ChannelHandler必须实现ChannelHandler的所有接口,包括它不关心的那些事件处理接口,这会导致用户代码的臃肿,代码的可维护性也会变差。

       为了解决这个问题,Netty提供了ChannelHandlerAdapter基类,它的所有接口实现都是事件透传。透传的方法一般用注解@Skip    

     

    二.ChannelHandler源码分析

    2.1  ByteToMessageDecoder源码分析

         ByteToMessageDecoder用于将ByteBuf解码成POJO对象。但是ByteToMessageDecoder并没有考虑到TCP粘包和组包等场景,读半包需要用户解码器自己负责处理。正式因为这个,对于大多数场景不会直接继承ByteToMessageDecoder。接下来看看channelRead的源码,如下:

     

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if(msg instanceof ByteBuf) {
                RecyclableArrayList out = RecyclableArrayList.newInstance();
                boolean var12 = false;
    
                try {
                    var12 = true;
                    ByteBuf data = (ByteBuf)msg;
                    this.first = this.cumulation == null;
                    if(this.first) {
                        this.cumulation = data;
                    } else {
                        this.cumulation = this.cumulator.cumulate(ctx.alloc(), this.cumulation, data);
                    }
    
                    this.callDecode(ctx, this.cumulation, out);
                    var12 = false;
                } catch (DecoderException var13) {
                    throw var13;
                } catch (Throwable var14) {
                    throw new DecoderException(var14);
                } finally {
                    if(var12) {
                        if(this.cumulation != null && !this.cumulation.isReadable()) {
                            this.cumulation.release();
                            this.cumulation = null;
                        }
    
                        int size = out.size();
    
                        for(int i = 0; i < size; ++i) {
                            ctx.fireChannelRead(out.get(i));
                        }
    
                        out.recycle();
                    }
                }
    
                if(this.cumulation != null && !this.cumulation.isReadable()) {
                    this.cumulation.release();
                    this.cumulation = null;
                }
    
                int size = out.size();
    
                for(int i = 0; i < size; ++i) {
                    ctx.fireChannelRead(out.get(i));
                }
    
                out.recycle();
            } else {
                ctx.fireChannelRead(msg);
            }
    
        }

    从源码可以看出来,首先判断需要解码msg对象是否是ByteBuf,如果是ByteBuf才需要进行解码,是否与直接透传。通过cumulation是否为空判断解码器是否缓存了没有解码完成的半包消息,如果为空,说明是首次解码或者最近一次已经处理完了半包消息,没有缓存的半包消息需要处理直接将需要解码的ByteBuf赋值给cumulation;如果cumulation缓存有上次没有解码完成的ByteBuf,则进行赋值操作,将需要解码的ByteBuf复制到cumulation中。

         在复制之前需要对cumulation的可写缓冲区进行判断,如果不足则需要动态扩展,扩展的源码如下:

     

        static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
            ByteBuf oldCumulation = cumulation;
            cumulation = alloc.buffer(cumulation.readableBytes() + readable);
            cumulation.writeBytes(oldCumulation);
            oldCumulation.release();
            return cumulation;
        }

    复制操作完成之后释放需要解码的ByteBuf对象,调用callDecode方法进行解码。

     

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                while(true) {
                    if(in.isReadable()) {
                        int outSize = out.size();
                        int oldInputLength = in.readableBytes();
                        this.decode(ctx, in, out);
                        if(!ctx.isRemoved()) {
                            if(outSize == out.size()) {
                                if(oldInputLength != in.readableBytes()) {
                                    continue;
                                }
                            } else {
                                if(oldInputLength == in.readableBytes()) {
                                    throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() did not read anything but decoded a message.");
                                }
    
                                if(!this.isSingleDecode()) {
                                    continue;
                                }
                            }
                        }
                    }
    
                    return;
                }
            } catch (DecoderException var6) {
                throw var6;
            } catch (Throwable var7) {
                throw new DecoderException(var7);
            }
        }

    源码分析:对ByteBuf进行循环解码,循环的条件是解码缓冲区中有可读的字节,调用抽象decode方法,由用户的子类解码器进行解码,解码后需要对当前的pipeline状态和解码结果进行判断。

    如果当前的ChannelHandlerContext已经被移除,则不能继续进行解码,直接退出循环;如果输出的out列表长度没有变化,说明解码没有成功,需要针对以下不同场景进行判断:

    (1)如果用户解码器没有消费ByteBuf,则说明是个半包消息,需要由I/O线程继续读取后续的数据报,在这种场景下要退出循环。

    (2)如果yoghurt解码器消费了ByteBuf,说明可以解码可以继续进行。

    (3)如果用户解码器没有消费ByteBuf,但是却解码出了一个或者多个对象,这种行为被认为是非法的,需要抛出DecoderException异常。

    (4)最后通过isSingleDecoder进行判断,如果是单条消息解码器,第一次解码完成之后就退出循环

     

    2.2 MessageToMessageDecoder源码分析

      MessageToMessageDecoder负责将一个POJO对象解码成另一个POJO对象,接下来看下channelRead的源码:

     

     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            boolean var13 = false;
    
            try {
                var13 = true;
                if(this.acceptInboundMessage(msg)) {
                    Object cast = msg;
    
                    try {
                        this.decode(ctx, cast, out);
                    } finally {
                        ReferenceCountUtil.release(msg);
                    }
    
                    var13 = false;
                } else {
                    out.add(msg);
                    var13 = false;
                }
            } catch (DecoderException var19) {
                throw var19;
            } catch (Exception var20) {
                throw new DecoderException(var20);
            } finally {
                if(var13) {
                    int size = out.size();
    
                    for(int i = 0; i < size; ++i) {
                        ctx.fireChannelRead(out.get(i));
                    }
    
                    out.recycle();
                }
            }
    
            int size = out.size();
    
            for(int i = 0; i < size; ++i) {
                ctx.fireChannelRead(out.get(i));
            }
    
            out.recycle();
        }

    第一通过RecyclableArrayList创建一个新的可循环利用的RecyclableArrayList,然后对解码的消息类型进行判断,通过类型参数校验器看是否是可接受的类型,如果是则校验通过,校验通过之后,直接调用decode抽象方法,有具体实现子类进行消息解码,如果需要解码的对象不是当前解码器 可以接收和处理的类型,则将它加入到RecyclableArrayList中不进行解码。最后对RecyclableArrayList进行遍历,循环调用ChannelHandlerContextfireChannelRead方法,通知后续的ChannelHandler继续进行处理。循环通知完成之后,通过recycle方法释放RecyclableArrayList

     

    2.3  LengthFieldBasedFrameDecoder 源码分析

    LengthFieldBasedFrameDecoder  基于消息长度的半包解码器。

     

     

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            if(this.discardingTooLongFrame) {
                long bytesToDiscard = this.bytesToDiscard;
                int localBytesToDiscard = (int)Math.min(bytesToDiscard, (long)in.readableBytes());
                in.skipBytes(localBytesToDiscard);
                bytesToDiscard -= (long)localBytesToDiscard;
                this.bytesToDiscard = bytesToDiscard;
                this.failIfNecessary(false);
            }
    
            if(in.readableBytes() < this.lengthFieldEndOffset) {
                return null;
            } else {
                int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;
                long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);
                if(frameLength < 0L) {
                    in.skipBytes(this.lengthFieldEndOffset);
                    throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
                } else {
                    frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);
                    if(frameLength < (long)this.lengthFieldEndOffset) {
                        in.skipBytes(this.lengthFieldEndOffset);
                        throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + this.lengthFieldEndOffset);
                    } else if(frameLength > (long)this.maxFrameLength) {
                        long discard = frameLength - (long)in.readableBytes();
                        this.tooLongFrameLength = frameLength;
                        if(discard < 0L) {
                            in.skipBytes((int)frameLength);
                        } else {
                            this.discardingTooLongFrame = true;
                            this.bytesToDiscard = discard;
                            in.skipBytes(in.readableBytes());
                        }
    
                        this.failIfNecessary(true);
                        return null;
                    } else {
                        int frameLengthInt = (int)frameLength;
                        if(in.readableBytes() < frameLengthInt) {
                            return null;
                        } else if(this.initialBytesToStrip > frameLengthInt) {
                            in.skipBytes(frameLengthInt);
                            throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip);
                        } else {
                            in.skipBytes(this.initialBytesToStrip);
                            int readerIndex = in.readerIndex();
                            int actualFrameLength = frameLengthInt - this.initialBytesToStrip;
                            ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);
                            in.readerIndex(readerIndex + actualFrameLength);
                            return frame;
                        }
                    }
                }
            }
        }

    源码可以看出:如果解码成功,将其加入到输出的List<Object> out 列表中,然后判断discardingTooLongFrame标识,看是否需要丢弃当前可读的字节缓冲区,如果为真,则执行丢弃操作,具体操作如下:

     

       private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
            if(this.bytesToDiscard == 0L) {
                long tooLongFrameLength = this.tooLongFrameLength;
                this.tooLongFrameLength = 0L;
                this.discardingTooLongFrame = false;
                if(!this.failFast || this.failFast && firstDetectionOfTooLongFrame) {
                    this.fail(tooLongFrameLength);
                }
            } else if(this.failFast && firstDetectionOfTooLongFrame) {
                this.fail(this.tooLongFrameLength);
            }
    
        }

    判断需要丢弃的字节长度,由于丢弃的字节数 不能大于当前缓冲区可读的字节数,通过Max.min函数进行选择,取byteToDiscard和缓冲区可读直接之中的最小值。计算获取需要要丢弃的字节数之后,调用ByteBufskipBytes方法跳过需要忽略的字节长度,然后bytesToDiscard减去已经忽略的字节长度。最后判断是否已经达到需要忽略的字节数,达到的话对discardingTooLongFrame等进行置位。

     

       对当前缓冲区的可读字节数和长度偏移量进行对比,如果小于长度偏移量,则说明当前缓冲区的数据报不够,需要返回空,由I/O线程继续读取后续的数据报。

       然后通过读索引和lengthFieldOffset计算获取实际的长度字段索引,然后通过索引值获取消息报文的长度字段。然后根据长度字段自身的字节长度进行判断,获取长度之后,就需要对长度仅需合法性判断,同时根据其他解码器参数进行长度调整。如果长度小于0,说明报文非法,跳过lengthFieldEndOffset个字节,抛出CorruptedFrameException异常。根据lengthFieldEndOffsetlengthAdjustmet字段进行长度修正,如果修正后的报文长度小于lengthFieldEndOffset,则说明是非法数据报,需要抛出CorruptedFrameException 异常。

       如果修正后报文长度大于ByteBuf的最大容量,说明接收到的消息长度大于系统允许的最大长度上限,需要设置discardingTooLongFrame,计算需要丢弃的字节数,根据情况选择是否需要抛出解码异常。

       如果当前的可读字节数小于frameLength,说明是个半包消息,需要返回空,由I/O线程继续读取后续的数据报,等待下次解码。

       对需要忽略的消息头字段进行判断,如果大于消息长度frameLength,说明码流非法,需要忽略当前的数据报,抛出CorruptedFrameException异常,通过ByteBufskipBytes方法忽略消息头中不需要的字段,得到整包ByteBuf。通过extractFrame方法获取解码后的整包消息缓冲区

       根据消息的实际长度分配一个新的ByteBuf对象,将需要解码的ByteBuf可写缓冲区复制到新创建的ByteBuf中并返回,返回之后更新原解码缓冲区ByteBuf为原读索引+消息报文的实际长度。

     

    2.4  MessageToByteEncoder源码分析

         MessageToByteEncoder 负责将用户的POJO对象编码成ByteBuf,以便通过网络进行传输。

     

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            ByteBuf buf = null;
    
            try {
                if(this.acceptOutboundMessage(msg)) {
                    I cast = msg;
                    buf = this.allocateBuffer(ctx, msg, this.preferDirect);
    
                    try {
                        this.encode(ctx, cast, buf);
                    } finally {
                        ReferenceCountUtil.release(msg);
                    }
    
                    if(buf.isReadable()) {
                        ctx.write(buf, promise);
                    } else {
                        buf.release();
                        ctx.write(Unpooled.EMPTY_BUFFER, promise);
                    }
    
                    buf = null;
                } else {
                    ctx.write(msg, promise);
                }
            } catch (EncoderException var17) {
                throw var17;
            } catch (Throwable var18) {
                throw new EncoderException(var18);
            } finally {
                if(buf != null) {
                    buf.release();
                }
    
            }
    
        }

    首先判断当前编码器是否支持需要发送的消息,如果不支持则直接透传;如果支持则判断缓冲区的类型,对于直接内存分配ioBuffer(堆外内存),对于堆内存通过heapBuffer方法分配。编码使用的缓冲区分配完成之后,调用encode抽象方法进行编码,编码完成之后,调用ReferenceCountUtilrelease方法释放编码对象msg。对编码后的ByteBuf进行以下判断:

    1)如果缓冲区包含可发送的字节,则调用ChannelHandlerContextwrite方法发送ByteBuf

    2)如果缓冲区没有包含可写的字节,则需要释放编码后的ByteBuf,写入一个空的发送操作完成之后,在方法退出之前释放编码缓冲区ByteBuf对象。

     

    2.5  LengthFieldPrepender 源码分析

    LengthFieldPrepender 负责在待发送的ByteBuf消息头中增加一个长度字段来标识消息的长度,它简化了用户的编码开发,使用户不需要额外去设置这个长度字段。

     

    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            int length = msg.readableBytes() + this.lengthAdjustment;
            if(this.lengthIncludesLengthFieldLength) {
                length += this.lengthFieldLength;
            }
    
            if(length < 0) {
                throw new IllegalArgumentException("Adjusted frame length (" + length + ") is less than zero");
            } else {
                switch(this.lengthFieldLength) {
                case 1:
                    if(length >= 256) {
                        throw new IllegalArgumentException("length does not fit into a byte: " + length);
                    }
    
                    out.add(ctx.alloc().buffer(1).writeByte((byte)length));
                    break;
                case 2:
                    if(length >= 65536) {
                        throw new IllegalArgumentException("length does not fit into a short integer: " + length);
                    }
    
                    out.add(ctx.alloc().buffer(2).writeShort((short)length));
                    break;
                case 3:
                    if(length >= 16777216) {
                        throw new IllegalArgumentException("length does not fit into a medium integer: " + length);
                    }
    
                    out.add(ctx.alloc().buffer(3).writeMedium(length));
                    break;
                case 4:
                    out.add(ctx.alloc().buffer(4).writeInt(length));
                    break;
                case 5:
                case 6:
                case 7:
                default:
                    throw new Error("should not reach here");
                case 8:
                    out.add(ctx.alloc().buffer(8).writeLong((long)length));
                }
    
                out.add(msg.retain());
            }
        }

    源码分析如下:

    首先对长度字段进行设置,如果需要包含消息长度自身,则在原来长度的基础之上再加上lengthFieldLength的长度。

    如果调整后的消息长度小于0,则抛出参数非法异常。对消息长度自身所占的字节数进行判断,以便采用正确的方法将长度字段写入到ByteBuf中,共有6种可能:

    1)长度字段所占字节为1:如果使用1Byte字节代表消息长度,则最大长度需要小于256个字节。对长度进行校验,如果校验失败,则抛出参数非法异常;若校验通过,则创建新的ByteBuf并通过writeByte将长度值写入到ByteBuf中。

    2)长度字段所占字节为2,:如果使用2Byte字节代表消息长度,则最大长度需要小于65536个字段,对长度进行校验,如果检验失败,则抛出参数非法异常;若校验通过,则创建新的ByteBuf并通过writeShort将长度值写入到ByteBuf中。

    3)长度字段所占字节为3:如果使用3Byte字节代表消息长度,则最大长度需要小于16777216个字节,对长度进行校验,如果校验失败,则抛出参数非法异常;若校验通过,则创建新的ByteBuf并通过writeMedium将长度值写入到ByteBuf中。

    4)长度字段所占字节为4:创建新的ByteBuf,并通过writeInt将长度值写入到ByteBuf中。

    5)长度字段所占字节为8:创建新的ByteBuf,并通过writeLong将长度值写入到ByteBuf中。

    6)其他长度值:直接抛出Error

  • 相关阅读:
    Java I/O的典型使用方式
    搜索--hiho 骑士问题
    编程之美--水王(找出出现超过1/2的数)
    深入理解java虚拟机之类文件结构以及加载
    【转载】Java JVM 运行机制及基本原理
    整数的划分总结(转)
    java静态方法和非静态方法
    mongodb 运行错误总结
    MongoDb windows环境安装,附百度云链接
    JAVA解析Json数据
  • 原文地址:https://www.cnblogs.com/lovegrace/p/11184879.html
Copyright © 2011-2022 走看看