zoukankan      html  css  js  c++  java
  • Netty源码分析——NioEventLoop接受客户端连接流程

    上一节说完channel注册,本节来说明新链接的注册流程,这里的前提是已经有正在监控NioServerSocketChannel的EventLoop正在运行。

    所以本节的入口是,下面代码的红色部分

        @Override
        protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));

    if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }

    debug代码会进入processSelectedKeysOptimized方法,这是要连接一个socket,可以使用网络调试助手。

        private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
            for (int i = 0;; i ++) {
                final SelectionKey k = selectedKeys[i];
                if (k == null) {   //没有新连接时会走这里,退出
                    break;
                }
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);   //有新连接来时,会到这里
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    for (;;) {
                        i++;
                        if (selectedKeys[i] == null) {
                            break;
                        }
                        selectedKeys[i] = null;
                    }
    
                    selectAgain();
                    // Need to flip the optimized selectedKeys to get the right reference to the array
                    // and reset the index to -1 which will then set to 0 on the for loop
                    // to start over again.
                    //
                    // See https://github.com/netty/netty/issues/1523
                    selectedKeys = this.selectedKeys.flip();
                    i = -1;
                }
            }
        }

    连接到来时会进入processSelectedKey方法,处理新链接。

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();  //这里应该为16,代表接受连接
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();  //进入这个方法处理
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }

    继续跟unsefe.read()方法

            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);  //这里处理新链接
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));  //这里处理接受的连接
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }

    继续进入doReadMessages方法,

        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = javaChannel().accept();   //这里获取了接受的新链接
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch)); //这里包装成了NioSocketChannel,并加入到集合
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }

    接着执行pipeline.fireChannelRead(readBuf.get(i)); 上一节提过,当前的channelpiline中存在一个处理连接的ServerBootstrapAcceptor入站处理器,会在这个处理器中处理新的连接,会执行下面的代码。

    具体怎么到下面的代码的后续会详细说明,channelpipline中的调用关系。

            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);  //会将用户自定义的channelhandler加入到channelpipline,用户来处理发送或接受的业务数据
    
                for (Entry<ChannelOption<?>, Object> e: childOptions) {
                    try {
                        if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                            logger.warn("Unknown channel option: " + e);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to set a channel option: " + child, t);
                    }
                }
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {   //将新接受的连接注册到worker时间循环的一个EventLoop线程上,具体执行步骤与NioServerSocketChannel注册到EventLoop原理一样,不再赘述
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }

    至此,bossGroup监控一个新连接的流程梳理完成,其实workerGroup监听NIoSocketChannel事件的读写流程与这个流程基本一致,各位同学可以自己去阅读源码,自己感受。

  • 相关阅读:
    redhat6.4 数据包无法到达
    hibernate-Table 'XXX.XXX' doesn't exist
    LeetCode 之 TwoSum
    vim 中的常用编辑
    linux sed 批量替换多个文件中的字符串
    RedHat 6.4企业版利用iso镜像做本地yum源
    win7 vmware虚拟机上网设置
    virtualbox ubuntu下ssh连接
    Source Insight 插件
    非递归排序
  • 原文地址:https://www.cnblogs.com/chirsli/p/12541431.html
Copyright © 2011-2022 走看看