zoukankan      html  css  js  c++  java
  • netty之handler read

    有时候会有一系列的处理in的handler,使用fireChannelRead处理传递

    转载自https://blog.csdn.net/u011702633/article/details/82051329

    Netty源码解析(八) —— channel的read操作

    客户端channel在建立连接之后会关注read事件,那么read事件在哪触发的呢? 
    NioEventLoop中

                /**
                 * 读事件和 accept事件都会经过这里,但是拿到的unsafe对象不同  所以后续执行的read操作也不一样
                 * NioServerChannel进行accept操作
                 * NioChannel进行read操作
                 */
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

            @Override
            public final void read() {
                final ChannelConfig config = config();
                //若 inputClosedSeenErrorOnRead = true ,移除对 SelectionKey.OP_READ 事件的感兴趣。
                if (shouldBreakReadReady(config)) {
                    //清楚读事件
                    clearReadPending();
                    return;
                }
    
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                //获取并重置allocHandle对象
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;//是否关闭
                try {
                    do {
                        //申请bytebuf
                        byteBuf = allocHandle.allocate(allocator);
                        //读取数据,设置最后读取字节数
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        //没读到数据
                        if (allocHandle.lastBytesRead() <= 0) {
                            // 梅毒到数据  释放buf
                            byteBuf.release();
                            byteBuf = null;
                            //如果最后读取的字节为小于 0 ,说明对端已经关闭
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    
                        //读到了数据
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        //通知pipline read事件
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());//循环判断是否继续读取
    
                    //读取完成
                    allocHandle.readComplete();
                    //通知pipline读取完成
                    pipeline.fireChannelReadComplete();
    
                    if (close) {//关闭连接
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    .....
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    
    1. 若 inputClosedSeenErrorOnRead = true ,移除对 SelectionKey.OP_READ 事件的感兴趣。
    2. 获取并重置allocHandle对象(代理alloctor的一些功能)
    3. 读取数据,设置最后读取字节数
    4. 通知pipline read事件
    5. 通知pipline读取完成 
      这次先跟着主线走,然后再回头看细节,直接定位到事件通知 
      io.netty.channel.DefaultChannelPipeline#fireChannelRead
        /**
         * 有数据读入的时候会调用(InBound)  也可以手动调用
         * @param msg 客户端连接的channel
         * @return
         */
        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            //pipline节点类的静态方法 穿进去的head
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    最后调用到io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead

            /**
             * 被调用read  可以向下传递
             * @param ctx
             * @param msg
             * @throws Exception
             */
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ctx.fireChannelRead(msg);
            }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    ctx.fireChannelRead(msg);向下个节点传递,如果自定义了handler来处理就可以拦截channelRead的bytebuf数据来进行处理,负责一直向下传递到TailContext节点处理 
    io.netty.channel.DefaultChannelPipeline.TailContext#channelRead

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
                //如果不去自定义handler处理byteBuf  最终会到TailContext 来处理
                onUnhandledInboundMessage(msg);
            }
        protected void onUnhandledInboundMessage(Object msg) {
            try {
                //日志提醒没有处理
                logger.debug(
                        "Discarded inbound message {} that reached at the tail of the pipeline. " +
                                "Please check your pipeline configuration.", msg);
            } finally {
                //释放byteBuf内存
                ReferenceCountUtil.release(msg);
            }
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    数据到达TailContext节点之后,再onUnhandledInboundMessage方法中打印数据未处理日志,然后释放bytebuf内存 
    io.netty.channel.nio.AbstractNioByteChannel#doReadBytes读取操作的方法

        @Override
        /**
         * 读取数据
         */
        protected int doReadBytes(ByteBuf byteBuf) throws Exception {
            //获取handle对象
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            //设置读索引=写索引
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());
            //读取无数据到buf
            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    1. allocHandle.lastBytesRead()小于0证明没有读取到数据,要释放bytebuf
    2. 读取到数据,allocHandle.incMessagesRead(1);
    3. allocHandle.continueReading() 循环判断是否继续读取

            @Override
            public boolean continueReading() {
                return continueReading(defaultMaybeMoreSupplier);
            }
    
            @Override
            public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
                // Keep reading if we are allowed to read more bytes, and our last read filled up the buffer we provided.
                //最后读取的字节数大于0  并且等于最大可写入的字节数
                return bytesToRead > 0 && maybeMoreDataSupplier.get();
            }
            private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
                @Override
                public boolean get() {
                    //最后读取的字节数 是否等于最大可写入字节数
                    return attemptedBytesRead == lastBytesRead;
                }
            };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    最后读取的字节数大于0,并且最后读取的数据==尝试可写入的大小,即证明可以继续读取 
    出现异常时候调用io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#handleReadException

            private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
                    RecvByteBufAllocator.Handle allocHandle) {
                if (byteBuf != null) {
                    if (byteBuf.isReadable()) {
                        readPending = false;
                        //把已经读取到的数据  通知到pipline中
                        pipeline.fireChannelRead(byteBuf);
                    } else {
                        //释放bytebuf
                        byteBuf.release();
                    }
                }
                //读取完成
                allocHandle.readComplete();
                //通知pipline读取完成
                pipeline.fireChannelReadComplete();
                //通知pipline异常
                pipeline.fireExceptionCaught(cause);
                if (close || cause instanceof IOException) {
                    closeOnRead(pipeline);
                }
            }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    io.netty.channel.DefaultChannelPipeline#fireExceptionCaught

        @Override
        public final ChannelPipeline fireExceptionCaught(Throwable cause) {
            AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
            return this;
        }

    最终调用到TailContext的io.netty.channel.DefaultChannelPipeline.TailContext#exceptionCaught

            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                onUnhandledInboundException(cause);
            }
        protected void onUnhandledInboundException(Throwable cause) {
            try {
                logger.warn(
                        "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                                "It usually means the last handler in the pipeline did not handle the exception.",
                        cause);
            } finally {
                ReferenceCountUtil.release(cause);
            }
        }

    和读取操作一样,最终也是要再发生异常的时候释放buf的内存

  • 相关阅读:
    Java 引用类型
    Mysql-5.7.14使用常见问题汇总
    CountBoard 是一个基于Tkinter简单的,开源的桌面日程倒计时应用
    HashMap的源码分析
    redis-cluster源码分析
    redis集群方案
    redis集群命令
    redis集群删除master节点
    redis集群添加master节点
    redis集群部署
  • 原文地址:https://www.cnblogs.com/heroinss/p/10243185.html
Copyright © 2011-2022 走看看