zoukankan      html  css  js  c++  java
  • 关于 Netty Channel 的 Autoread

    Netty 4 的 Channel 多了一个 autoread 参数, 它的用处是在让 channel 在触发某些事件以后(例如 channelActive, channelReadComplete)以后还会自动调用一次 read(), 代码:

    DefaultChannelPipeline.java

        @Override
        public ChannelPipeline fireChannelActive() {
            head.fireChannelActive();
    
            if (channel.config().isAutoRead()) {
                channel.read();
            }
    
            return this;
        }
    
    -----------------------------------
    
        @Override
        public ChannelPipeline fireChannelReadComplete() {
            head.fireChannelReadComplete();
            if (channel.config().isAutoRead()) {
                read();
            }
            return this;
        }

    另外一个很重要的作用是在触发某些事件(例如 socket 关闭)时, 在 NioEventloop 的 OP_READ 位为 1 时, 是否真正的去 channel 读取数据, 去 channel 读取数据的意义还包括可以更新 channel 的状态, 例如改变 active 状态等, 代码:

    Eventloop.java

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
            } catch (CancelledKeyException e) {
                unsafe.close(unsafe.voidPromise());
            }
        }

    AbstractNioByteChannel.java (unsafe.read())

    public void read() {
                final ChannelConfig config = config();
    // 判断是否是 autoread, 如果不是直接移除 opRead 标志位返回
    if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes();
    // 这里会到 channel 中 read 一次, 如果返回 -1, 则表示 channel 关闭
    // 假如说 autoread 为 false, 则在 socket close 时永远不会进入这里, 从而改变 channel 的 active 状态
    int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; // stop reading if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); // 关闭 channel, 并改变 channel active 状态 if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
  • 相关阅读:
    “您的外卖订单正在由机器人配送中”:探访送货机器人进楼宇
    外媒:比特币大陆将于9月IPO 规模或高达180亿美元
    网站被挂马的处理办法以及预防措施
    【学习】linux环境下nginx文件彻底删除
    【学习】SpringBoot之全局异常处理器
    【学习】SpringBoot之自定义拦截器
    _parameter:解决There is no getter for property named in class java.lang.String
    Window安装Redis并设置为开机启动
    SpringBoot 使用定时任务动态执行任务
    网易云信(创建账号,添加好友,获取好友关系,发送系统消息《推送》,删除好友,修改用户信息)
  • 原文地址:https://www.cnblogs.com/zemliu/p/4072646.html
Copyright © 2011-2022 走看看