zoukankan      html  css  js  c++  java
  • Netty 源码 NioEventLoop(三)执行流程

    Netty 源码 NioEventLoop(三)执行流程

    Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

    相关文章:

    上文提到在启动 NioEventLoop 线程时会执行 SingleThreadEventExecutor#doStartThread(),在这个方法中调用 SingleThreadEventExecutor.this.run(),NioEventLoop 重写了 run() 方法。NioEventLoop#run() 代码如下:

    @Override
    protected void run() {
        for (;;) {
            try {
                // 1. select 策略选择
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    // 1.1 非阻塞的 select 策略。实际上,默认情况下,不会返回 CONTINUE 的策略
                    case SelectStrategy.CONTINUE:
                        continue;
                    // 1.2 阻塞的 select 策略
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    // 1.3 不需要 select,目前已经有可以执行的任务了
                    default:
                }
    
                // 2. 执行网络 IO 事件和任务调度
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        // 2.1. 处理网络 IO 事件
                        processSelectedKeys();
                    } finally {
                        // 2.2. 处理系统 Task 和自定义 Task
                        runAllTasks();
                    }
                } else {
                    // 根据 ioRatio 计算非 IO 最多执行的时间 
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // 3. 关闭线程
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    

    NioEventLoop#run() 做了记下事情:

    1. 根据 selectStrategy 执行不同的策略
    2. 执行网络 IO 事件和任务调度
    3. 关闭线程

    1. IO 轮询策略

    当 taskQueue 中没有任务时,那么 Netty 可以阻塞地等待 IO 就绪事件。而当 taskQueue 中有任务时,我们自然地希望所提交的任务可以尽快地执行 ,因此 Netty 会调用非阻塞的 selectNow() 方法,以保证 taskQueue 中的任务尽快可以执行。

    (1) hasTasks

    首先,在 run 方法中,第一步是调用 hasTasks() 方法来判断当前任务队列中是否有任务

    protected boolean hasTasks() {
        assert inEventLoop();
        return !taskQueue.isEmpty();
    }
    

    这个方法很简单,仅仅是检查了一下 taskQueue 是否为空。至于 taskQueue 是什么呢,其实它就是存放一系列的需要由此 EventLoop 所执行的任务列表。关于 taskQueue,我们这里暂时不表,等到后面再来详细分析它。

    (2) DefaultSelectStrategy

    // NioEventLoop#selectNowSupplier
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };
    
    // 非阻塞的 select 策略。实际上,默认情况下,不会返回 CONTINUE 的策略
    SelectStrategy.SELECT = -1;
    // 阻塞的 select 策略
    SelectStrategy.CONTINUE = -2;
    
    // DefaultSelectStrategy 
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
    

    显然当 taskQueue 为空时,执行的是 select(oldWakenUp) 方法。那么 selectNow() 和 select(oldWakenUp) 之间有什么区别呢? 来看一下,selectNow() 的源码如下

    (3) selectNow

    int selectNow() throws IOException {
        try {
            return selector.selectNow();
        } finally {
            // restore wakeup state if needed
            if (wakenUp.get()) {
                selector.wakeup();
            }
        }
    }
    

    调用 JDK 底层的 selector.selectNow()。selectNow() 方法会检查当前是否有就绪的 IO 事件,如果有,则返回就绪 IO 事件的个数;如果没有,则返回 0。注意,selectNow() 是立即返回的,不会阻塞当前线程。当 selectNow() 调用后,finally 语句块中会检查 wakenUp 变量是否为 true,当为 true 时,调用 selector.wakeup() 唤醒 select() 的阻塞调用。

    (4) select(boolean oldWakenUp)

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectedKeys = selector.select(timeoutMillis);
        } catch (CancelledKeyException e) {
        }
    }
    

    在这个 select 方法中,调用了 selector.select(timeoutMillis),而这个调用是会阻塞住当前线程的,timeoutMillis是阻塞的超时时间。到来这里,我们可以看到,当 hasTasks() 为真时,调用的的 selectNow() 方法是不会阻塞当前线程的,而当 hasTasks() 为假时,调用的 select(oldWakenUp) 是会阻塞当前线程的。

    2. IO 事件的处理

    在 NioEventLoop.run() 方法中,第一步是通过 select/selectNow 调用查询当前是否有就绪的 IO 事件,那么当有 IO 事件就绪时,第二步自然就是处理这些 IO 事件啦。首先让我们来看一下 NioEventLoop.run 中循环的剩余部分:

    final int ioRatio = this.ioRatio;
    if (ioRatio == 100) {
        try {
            // 2.1. 处理网络 IO 事件
            processSelectedKeys();
        } finally {
            // 2.2. 处理系统 Task 和自定义 Task
            runAllTasks();
        }
    } else {
        // 根据 ioRatio 计算非 IO 最多执行的时间 
        final long ioStartTime = System.nanoTime();
        try {
            processSelectedKeys();
        } finally {
            final long ioTime = System.nanoTime() - ioStartTime;
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }
    

    上面列出的代码中,有两个关键的调用:

    • 第一个是 processSelectedKeys():处理准备就绪的 IO 事件;
    • 第二个是 runAllTasks():运行 taskQueue 中的任务。

    这里的代码还有一个十分有意思的地方,即 ioRatio。那什么是 ioRatio 呢?它表示的是此线程分配给 IO 操作所占的时间比(即运行 processSelectedKeys 耗时在整个循环中所占用的时间)。例如 ioRatio 默认是 50,则表示 IO 操作和执行 task 的所占用的线程执行时间比是 1 : 1。当知道了 IO 操作耗时和它所占用的时间比,那么执行 task 的时间就可以很方便的计算出来了。

    我们这里先分析一下 processSelectedKeys() 方法调用,runAllTasks() 留到下面再分析。processSelectedKeys() 方法的源码如下:

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    

    由于默认未开启 selectedKeys 优化功能,所以会进入 processSelectedKeysPlain 分支执。下面继续分析 processSelectedKeysPlain 的代码实现。

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }
    
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();
    
            if (a instanceof AbstractNioChannel) {
                // NioSocketChannel 或 NioServerSocketChannel 进行 IO 读写相关的操作
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                // 用户自行注册的 Task 任务,一般情况下不会执行
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            if (!i.hasNext()) {
                break;
            }
            // 省略...
        }
    }
    

    processSelectedKey 方法源码如下:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        // 省略...
    
        try {
            int readyOps = k.readyOps();
            // 1. OP_CONNECT 读写前要先处理连接,否则可能抛 NotYetConnectedException 异常
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
    
                unsafe.finishConnect();
            }
    
            // 2. OP_WRITE
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
    
            // 3. OP_READ 或 OP_ACCEPT
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

    这个代码是不是很熟悉啊?完全是 Java NIO 的 Selector 的那一套处理流程嘛!processSelectedKey 中处理了三个
    事件,分别是:

    • OP_READ 可读事件,即 Channel 中收到了新数据可供上层读取.
    • OP_WRITE 可写事件,即上层可以向 Channel 写入数据.
    • OP_CONNECT 连接建立事件,即 TCP 连接已经建立,Channel 处于 active 状态.

    下面我们分别根据这三个事件来看一下 Netty 是怎么处理的吧。

    2.1 OP_READ

    当就绪的 IO 事件是 OP_READ,代码会调用 unsafe.read() 方法。unsafe 我们已见过多次,NioSocketChannel 的 Unsafe 是在 AbstractNioByteChannel 中实现的,而 NioServerSocketChannel 的 Unsafe 是在 NioMessageUnsafe 中实现。

    public final void read() {
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
    
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                // 1. 分配缓冲区 ByteBuf
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                // 2. 从 NioSocketChannel 中读取数据
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        readPending = false;
                    }
                    break;
                }
    
                allocHandle.incMessagesRead(1);
                readPending = false;
                // 3. 调用 pipeline.fireChannelRead 发送一个 inbound 事件
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());
    
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
    
            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
    

    上面 read 方法其实归纳起来,可以认为做了如下工作:

    1. 分配 ByteBuf
    2. 从 SocketChannel 中读取数据
    3. 调用 pipeline.fireChannelRead 发送一个 inbound 事件

    2.2 OP_WRITE

    OP_WRITE 可写事件代码如下。这里代码比较简单,没有详细分析的必要了。

    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
        ch.unsafe().forceFlush();
    }
    

    2.3 OP_CONNECT

    最后一个事件是 OP_CONNECT,即 TCP 连接已建立事

    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        // 已连接后就需要注销 OP_CONNECT 事件 See https://github.com/netty/netty/issues/924
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
    
        unsafe.finishConnect();
    }
    

    OP_CONNECT 事件的处理中,只做了两件事情:

    1. 正如代码中的注释所言, 我们需要将 OP_CONNECT 从就绪事件集中清除, 不然会一直有 OP_CONNECT 事件。

    2. 调用 unsafe.finishConnect() 通知上层连接已建立
      unsafe.finishConnect() 调用最后会调用到 pipeline().fireChannelActive(),产生一个 inbound 事件,通知 pipeline 中的各个 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法会被调用)

    到了这里,我们整个 NioEventLoop 的 IO 操作部分已经了解完了,接下来的一节我们要重点分析一下 Netty 的任务
    队列机制。

    3. 任务调度

    我们已经提到过,在 Netty 中,一个 NioEventLoop 通常需要肩负起两种任务,第一个是作为 IO 线程,处理 IO 操作;第二个就是作为任务线程,处理 taskQueue 中的任务。这一节的重点就是分析一下 NioEventLoop 的任务队列机制
    的。

    3.1 普通 Runnable 任务

    // SingleThreadEventExecutor
    private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(maxPendingTasks);
    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }
    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }
    

    因此实际上,taskQueue 是存放着待执行的任务的队列。

    3.2 schedule 任务

    除了通过 execute 添加普通的 Runnable 任务外,我们还可以通过调用 eventLoop.scheduleXXX 之类的方法来添加
    一个定时任务。schedule 功能的实现是在 SingleThreadEventExecutor 的父类,即 AbstractScheduledEventExecutor 中实现的。

    // SingleThreadEventExecutor
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
    

    scheduledTaskQueue 是一个队列(Queue),其中存放的元素是 ScheduledFutureTask。而ScheduledFutureTask 我们很容易猜到,它是对 Schedule 任务的一个抽象。我们来看一下 AbstractScheduledEventExecutor 所实现的 schedule 方法:

    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }
    
        return task;
    }
    

    3.3 执行调度任务

    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;
    
        do {
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
    
        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }
    

    我们前面已经提到过,EventLoop 可以通过调用 EventLoop.execute 来将一个 Runnable 提交到 taskQueue 中,
    也可以通过调用 EventLoop.schedule 来提交一个 schedule 任务到 scheduledTaskQueue 中。在此方法的一开
    始调用的 fetchFromScheduledTaskQueue() 其实就是将 scheduledTaskQueue 中已经可以执行的(即定时时
    间已到的 schedule 任务) 拿出来并添加到 taskQueue 中,作为可执行的 task 等待被调度执行。代码如下:

    private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask  = pollScheduledTask(nanoTime);
        while (scheduledTask != null) {
            if (!taskQueue.offer(scheduledTask)) {
                // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
            scheduledTask  = pollScheduledTask(nanoTime);
        }
        return true;
    }
    

    接下来 runAllTasks() 方法就会不断调用 task = pollTask() 从 taskQueue 中获取一个可执行的 task,然后调用它
    的 run() 方法来运行此 task。

    注意: 因为 EventLoop 既需要执行 IO 操作,又需要执行 task,因此我们在调用 EventLoop.execute 方法提交
    任务时,不要提交耗时任务,更不能提交一些会造成阻塞的任务,不然会导致我们的 IO 线程得不到调度,影响整
    个程序的并发量。


    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    关于Visual Studio中的TraceDebugging文件夹
    没有App打得开发证书, 收不到推送
    转:ios应用崩溃日志揭秘
    转 iOS:NSAttributedString
    [UIDevice currentDevice].model
    转: Your build settings specify a provisioning profile with the UUID, no provisioning profile was found
    NSTimer 增加引用计数, 导致内存泄露,
    matplotlib基础(2)
    matplotlib基础
    《python自然语言处理》(1)
  • 原文地址:https://www.cnblogs.com/binarylei/p/10138638.html
Copyright © 2011-2022 走看看