zoukankan      html  css  js  c++  java
  • 4.Netty执行IO事件和非IO任务

    回顾NioEventLoop的run方法流程

    上文说到NioEventLoop的run方法可以分为3个步骤:

    1. 轮询channel中就绪的IO事件
    2. 处理轮询出的IO事件
    3. 处理所有任务,也包括定时任务

    其中步骤1已在上一节讲述,这里接着讲述下面2个步骤

    IO事件与非IO任务

    首先看一下在步骤2和步骤3的主干代码

    final int ioRatio = this.ioRatio;
    // 将所有任务执行完
    if (ioRatio == 100) {
        try {
            processSelectedKeys();
        } finally {
            // Ensure we always run tasks.
            runAllTasks();
        }
    } else {
        // 记录IO事件消耗的时间,然后按比例处理分配时间处理非IO任务
        final long ioStartTime = System.nanoTime();
        try {
            processSelectedKeys();
        } finally {
            // Ensure we always run tasks.
            final long ioTime = System.nanoTime() - ioStartTime;
            // ioRatio默认50,(100-ioRatio)/ioRatio刚好等于1,做到平均分配
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }
    

    ioRadio是NioEventLoop的一个成员变量,用来控制分配花费在IO事件与非IO任务时间的比例。默认情况下,ioRadio是50,表示IO事件与非IO任务
    将分配相同时间。而当ioRatio为100时,该值失效,不再平衡两种动作的时间分配比值。
    了解了这一点,上述两种分支代码就不难理解了,我们直接进入processSelectedKeys,看看netty如何执行IO事件

    处理IO事件

    先进入processSelectedKeys方法内部。

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    

    可以看到这里又根据selectedKeys是否为空这个条件来确定是处理优化过的keys还是普通keys。关于selectedKeys,在NioEventLoop介绍这一节中,
    我们介绍了NioEventLoop的创建,在创建过程中,默认会将SelectedKeys由Hashset替换为数组实现,此处的selectedKeys正是替换过后的实现。
    我们继续跟进到processSelectedKeysOptimized方法

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            selectedKeys.keys[i] = null;
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            if (needsToSelectAgain) {
                selectedKeys.reset(i + 1);
                selectAgain();
                i = -1;
            }
        }
    }
    

    方法内部用一个for循环处理selectedKeys。key的attchment默认是在注册时附加上去的NioServerSocketChannel和NioSocketChannel。
    继续跟进processSelectedKey(k, (AbstractNioChannel) a)方法。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop = ch.eventLoop();  
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            unsafe.close(unsafe.voidPromise());
            return;
        }
    
        int readyOps = k.readyOps();
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    }
    

    netty首先对selectionKey的有效性做了一个判断。当key无效时,关闭key所在的channel。当key有效时,委托NioUnsafe对象对key进行IO操作。
    注意这里先进行OP_CONNECT,再执行OP_WRITE,最后执行OP_READ和OP_ACCEPT。关于Unsafe的这些IO操作留待以后分析。

    processSelectedKeysPlain方法流程类似,略过

    处理非IO任务

    由于IoRatio默认为50,我们先进入runAllTasks(ioTime * (100 - ioRatio) / ioRatio)方法。

    protected boolean runAllTasks(long timeoutNanos) {
        // 步骤1
        fetchFromScheduledTaskQueue();
        // 步骤2
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }
        // 步骤3
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            // 步骤4
            safeExecute(task);
            runTasks ++;
            // 步骤5
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        // 步骤6
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
    

    非IO任务的执行可以分为6个步骤

    1. 从定时任务队列聚合任务到普通任务队列
    2. 从普通队列中获取任务
    3. 计算任务执行的超时时间
    4. 安全执行任务
    5. 任务执行到一定次数,计算是否超时
    6. 执行完taskQueue普通队列里的任务后,再去执行tailTaskQueue里的任务。但目前暂时没有看到tailTaskQueue使用的地方,也许是一个扩展点吧,这里先略过。

    我们一个一个步骤讲解

    聚合定时任务到普通任务队列

    首先看一下整体流程

    private boolean fetchFromScheduledTaskQueue() {
        if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
            return true;
        }
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        for (;;) {
            Runnable scheduledTask = pollScheduledTask(nanoTime);
            if (scheduledTask == null) {
                return true;
            }
            if (!taskQueue.offer(scheduledTask)) {
                scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
        }
    }
    

    首先先判断定时任务队列是否有任务,然后调用了一个AbstractScheduledEventExecutor.nanoTime(),该方法返回ScheduledFutureTask类从初始化
    到当前时刻的差值。也即将ScheduledFutureTask初始化的时刻当成零时刻。
    获取到零时刻到当前时刻的差值后,用一个for循环不断去定时任务队列里获取终止时刻在当前时刻之后的任务(scheduledTask.deadlineNanos() - nanoTime<=0)
    当获取到定时任务后,将它添加到普通任务队列taskQueue里。同时添加失败后,还会再重新添加回定时任务队列,防止任务直接丢失。

    说到定时任务队列,也少不了一探其实现。scheduledTaskQueue初始化代码如下:

    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new DefaultPriorityQueue<>(
                    SCHEDULED_FUTURE_TASK_COMPARATOR,
                    11);
        }
        return scheduledTaskQueue;
    }
    

    采用的是一个懒加载的方式,在调用scheduledTaskQueue()创建定时任务时才进行初始化。从名字可以看出,它是一个优先级队列,初始化容量为11,
    采用的Comparator是调用2个ScheduledFutureTask的compareTo方法,首先比较任务的终止时间,然后比较两个任务的id。代码较简单,就不列了。

    然后我们看下调度方法schedule

    private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task.setId(nextTaskId++));
        } else {
            executeScheduledRunnable(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task.setId(nextTaskId++));
                }
            }, true, task.deadlineNanos());
        }
        return task;
    }
    

    可以发现,netty将"添加定时任务"也当做一个任务,放入任务队列里。

    从普通队列中获取任务

    // NioEventLoop中定义的pollTask方法
    protected Runnable pollTask() {
        Runnable task = super.pollTask();
        if (needsToSelectAgain) {
            selectAgain();
        }
        return task;
    }
    // super.pollTask调用了此方法,定义在SingleThreadEventExecutor中
    protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
            for (;;) {
                Runnable task = taskQueue.poll();
                if (task != WAKEUP_TASK) {
                    return task;
                }
            }
        }
    

    这里依然是通过轮询从任务队列里取出任务,并且忽略WAKEUP_TASK这个标记性任务。

    计算任务执行的超时时间

    在当前时间上,加上IO事件执行的时间,作为非IO任务执行的超时时间

    安全执行

    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }
    

    捕获所有异常,使得定时任务报错时不退出

    计算是否超时

    由于nanoTime()是一个相对耗时的操作,netty默认执行了64次非IO任务后,才计算是否超时。若执行了超过64个任务没或者任务队列已经没有任务,
    就打断循环,并将当前时间更新为lastExecutionTime。

    总结

    到了这里,我们已经介绍完了大部分NioEventLoop的内容,限于笔者水平和文章篇幅,nioEventLoop所使用的任务队列MpscQueue和ScheduleFutureTask
    内部执行原理不再进一步深究。但这也已经足够对NioEventLoop塑造一个比较整体性的认识了。

  • 相关阅读:
    hdu 3666 差分约束系统
    hdu 1198农田灌溉
    常微分方程(阿諾爾德) Page 45 相空間,相流,運動,相曲線 註記
    高等微積分(高木貞治) 1.4節 例2
    常微分方程(阿諾爾德) Page 45 相空間,相流,運動,相曲線 註記
    解析函數論 Page 29 命題(2) 函數模的有界性
    高等微積分(高木貞治) 1.4節 例2
    解析函數論 Page 29 命題(1) 有界閉集上的一致連續性
    解析函數論 Page 29 命題(3) 模的下界的可達性
    解析函數論 Page 29 命題(2) 函數模的有界性
  • 原文地址:https://www.cnblogs.com/spiritsx/p/11992688.html
Copyright © 2011-2022 走看看