zoukankan      html  css  js  c++  java
  • Netty关闭连接流程分析

    在实际场景中,使用Netty4来实现RPC框架,服务端一般会验证协议,最简单的方法的协议探测,判断魔数是否正确。如果服务端无法识别协议会立即抛出异常,并主动关闭连接,此时客户端会收到read信号,在发现是一个关闭连接请求后会关闭本地连接,这其中用户可控的是InboundHandle收到 channelInactive方法。
    今天想搞清楚的是:
    假设consumer端 的InboundHandle收到 channelInactive 事件,是否可以立即结束这次请求并返回请求失败,而不必等到timeout才结束。
    单纯的这个协议探测失败场景中,答案是可以立即结束,因为服务端不会响应任何数据。其他场景呢,在第1个InboundHandle中可以直接响应操作失败吗?

    服务端主动关闭连接时,Netty的处理流程
    在netty4中channel都被绑定到特定I/O EventLoop线程中,I/O线程收到信号为SelectionKey.OP_READ的就绪信号,如果allocHandle.lastBytesRead() = -1,则表示这是连接关闭的请求。
    单独卡read的处理逻辑在AbstractNioByteChannel.read()中

            public final void read() {
                final ChannelConfig config = config();
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;  // -1表示关闭连接
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);  //执行关连接逻辑
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    close = allocHandle.lastBytesRead() < 0;
    根据最后可读的bytes为-1,即为关闭

    接下来是关闭连接的流程
    检查此连接之前是否为active状态,并且当前是否为active,进入fireChannelInactiveAndDeregister流程,注销channel
    这里不会立即注销channel,而是以一个任务的形式放到eventloop中稍后执行,文档中解释了不立即执行的原因:

            private void invokeLater(Runnable task) {
                try {
                    // This method is used by outbound operation implementations to trigger an inbound event later.
                    // They do not trigger an inbound event immediately because an outbound operation might have been
                    // triggered by another inbound event handler method.  If fired immediately, the call stack
                    // will look like this for example:
                    //
                    //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
                    //   -> handlerA.ctx.close()
                    //      -> channel.unsafe.close()
                    //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
                    //
                    // which means the execution of two inbound handler methods of the same handler overlap undesirably.
                    eventLoop().execute(task);
                } catch (RejectedExecutionException e) {
                    logger.warn("Can't invoke task later as EventLoop rejected it", e);
                }
            }
    

    大致的意思是 这个方法被用于出港操作来延迟触发一个到达的事件, 主要针对的场景是InboundHandler中产生了出港操作,比如主动断开连接,可能出现多个InboundHandler都会触发出港操作,这些操作可能是相同的如果立即执行,可能会重叠执行。
    如果客户端从未收到任何数据,直接被服务端主动关闭是没有这个顾虑的,因为还没有InboundHandler处理数据。

                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            doDeregister();
                        } catch (Throwable t) {
                            logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                        } finally {
                            if (fireChannelInactive) {
                                pipeline.fireChannelInactive();
                            }
                            // Some transports like local and AIO does not allow the deregistration of
                            // an open channel.  Their doDeregister() calls close(). Consequently,
                            // close() calls deregister() again - no need to fire channelUnregistered, so check
                            // if it was registered.
                            if (registered) {
                                registered = false;
                                pipeline.fireChannelUnregistered();
                            }
                            safeSetSuccess(promise);
                        }
                    }
                });
    

    在从eventloop移除当前channel后(doDeregister()方法),进入finally代码块,如果之前连接是可用的,则fireChannelInactive为true
    接下来进入pipeline.fireChannelInactive();流程

    DefaultChannelPipreline 中有一个 AbstractChannelHandlerContext 链表,他按用户配置的顺序被写入Pipiline中,表头是Netty自定义的HeadContext,先执行头部的AbstractChannelHandlerContext

        @Override
        public final ChannelPipeline fireChannelInactive() {
            AbstractChannelHandlerContext.invokeChannelInactive(head);
            return this;
        }
    

    AbstractChannelHandlerContext持有 ChannelHandler对象,此处实际为 ChannelInboundHandler

        private void invokeChannelInactive() {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelInactive(this);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelInactive();
            }
        }
    

    这里会先进入netty预置的DefaultChannelPipeline$HeadContext实现中,然后走到了我们配置的第一个InboundHandle中来
    默认的channelInactive实现在ByteToMessageDecoder中

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            channelInputClosed(ctx, true);
        }
    

    Netty对关闭连接容忍度非常高,关闭连接前如果连接仍然有可读的数据,会尝试把他读出来

        private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                channelInputClosed(ctx, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                try {
                    if (cumulation != null) {
                        cumulation.release();
                        cumulation = null;
                    }
                    int size = out.size();
                    fireChannelRead(ctx, out, size);
                    if (size > 0) {
                        // Something was read, call fireChannelReadComplete()
                        ctx.fireChannelReadComplete();
                    }
                    if (callChannelInactive) {
                        ctx.fireChannelInactive();
                    }
                } finally {
                    // Recycle in all cases
                    out.recycle();
                }
            }
        }
    	
        /**
         * Called when the input of the channel was closed which may be because it changed to inactive or because of
         * {@link ChannelInputShutdownEvent}.
         */
        void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
            if (cumulation != null) {
                callDecode(ctx, cumulation, out);
                decodeLast(ctx, cumulation, out);
            } else {
                decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
            }
        }	
        /**
         * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
         */
        static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
            for (int i = 0; i < numElements; i ++) {
                ctx.fireChannelRead(msgs.getUnsafe(i));
            }
        }	
    

    callDecode会尝试去decode已经在channel还为取完的数据,如果取到了,则外部方法中的size就会大于0,会走完fireChannelRead的流程

        /**
         * A {@link Channel} received a message.
         *
         * This will result in having the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}
         * method  called of the next {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
         * {@link Channel}.
         */
        ChannelInboundInvoker fireChannelRead(Object msg);
    

    fireChannelRead方法会调用ChannelPipeline中的下一个ChannelInboundHandler执行channelRead,我们常用的MessageToMessageDecoder执行channelRead时,会调用decode方法,也就是我们实现来处理请求的方法。
    然后设置channelReadComplete()
    最后调用ctx.fireChannelInactive();将事件传给下一个handler。

    Netty4中I/O EventLoop是单线程执行的,对一个channel来说是线程安全的,而channel上的数据必然是先于close信号到达的,那么,当我们收到close新号时,之前已经发送过来的数据,一定已经至少被尝试处理过了吗?如果是这样,还有必要执行一下callDecode逻辑来fireChannelRead吗?

    回到最初的代码块中
    AbstractNioByteChannel.read()中

            public final void read() {
                final ChannelConfig config = config();
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf); //如果是正常的read,立即 fireChannelRead
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    可以发现,如果是普通的读操作,会立即触发fireChannelRead,经过前面的分析,可以知道该方法将会invokeChannelRead,并且当前的executor是在eventLoop中的,那么channelRead会被立即执行,最终触发我们配置的InboundHandle
    而close新号必然晚于正常的数据流,因此数据一定是先被InboundHandle处理后才接受到close信号的。

    结论:
    单从服务端主动断开连接的场景,如果收到close新号,则在第一个inboundHandle的 channelInactive 方法中,直接通知业务任务已失败是安全的,因为数据流如果返回完全,则必然被处理过。
    如果客户端inbountHandle过程中主动close掉channel,也是安全的,因为他会稍后执行,并且尝试将最新的Inbound数据再次decode,如果有可处理的数据就走完所有InboundHandle(channel已经关闭,写响应会失败),没有则结束

  • 相关阅读:
    Lodash JS实用类库 数组操作 延时执行 功能强大
    7.【nuxt起步】-Nuxt与后端数据交互
    vue图片懒加载
    猎鹰与龙飞船基于 Linux,采用 C++、Chromium 与 JS 开发
    | 和 ||,& 和 && 的区别
    Linux安装.net core
    Linux下程序后台运行:nohup和&
    vuejs如何调试代码
    全局安装 Vue cli3 和 继续使用 Vue-cli2.x
    导入sass文件
  • 原文地址:https://www.cnblogs.com/windliu/p/13163854.html
Copyright © 2011-2022 走看看