zoukankan      html  css  js  c++  java
  • Netty源码分析第2章(NioEventLoop)---->第8节: 执行任务队列

     

    Netty源码分析第二章: NioEventLoop

     

    第八节: 执行任务队列

    继续回到NioEventLoop的run()方法:

    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        //轮询io事件(1)
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                //默认是50
                final int ioRatio = this.ioRatio; 
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        runAllTasks();
                    }
                } else {
                    //记录下开始时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        //处理轮询到的key(2)
                        processSelectedKeys();
                    } finally {
                        //计算耗时
                        final long ioTime = System.nanoTime() - ioStartTime;
                        //执行task(3)
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            //代码省略
        }
    }

    我们看到处理完轮询到的key之后, 首先记录下耗时, 然后通过runAllTasks(ioTime * (100 - ioRatio) / ioRatio)执行taskQueue中的任务

    我们知道ioRatio默认是50, 所以执行完ioTime * (100 - ioRatio) / ioRatio, 方法传入的值为ioTime, 也就是processSelectedKeys()的执行时间:

     

    跟进runAllTasks方法:

    protected boolean runAllTasks(long timeoutNanos) {
        //定时任务队列中聚合任务
        fetchFromScheduledTaskQueue();
        //从普通taskQ里面拿一个任务
        Runnable task = pollTask();
        //task为空, 则直接返回
        if (task == null) {
            //跑完所有的任务执行收尾的操作
            afterRunningAllTasks();
            return false;
        }
        //如果队列不为空
        //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        //执行每一个任务
        for (;;) {
            safeExecute(task);
            //标记当前跑完的任务
            runTasks ++;
            //当跑完64个任务的时候, 会计算一下当前时间
            if ((runTasks & 0x3F) == 0) {
                //定时任务初始化到当前的时间
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                //如果超过截止时间则不执行(nanoTime()是耗时的)
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            //如果没有超过这个时间, 则继续从普通任务队列拿任务
            task = pollTask();
            //直到没有任务执行
            if (task == null) {
                //记录下最后执行时间
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        //收尾工作
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

    首先会执行fetchFromScheduledTaskQueue()这个方法, 这个方法的意思是从定时任务队列中聚合任务, 也就是将定时任务中找到可以执行的任务添加到taskQueue

    我们跟进fetchFromScheduledTaskQueue()方法:

    private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        //从定时任务队列中抓取第一个定时任务
        //寻找截止时间为nanoTime的任务
        Runnable scheduledTask  = pollScheduledTask(nanoTime);
        //如果该定时任务队列不为空, 则塞到普通任务队列里面
        while (scheduledTask != null) {
            //如果添加到普通任务队列过程中失败
            if (!taskQueue.offer(scheduledTask)) {
                //则重新添加到定时任务队列中
                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
            //继续从定时任务队列中拉取任务
            //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
            scheduledTask = pollScheduledTask(nanoTime);
        }
        return true;
    }

     long nanoTime = AbstractScheduledEventExecutor.nanoTime() 代表从定时任务初始化到现在过去了多长时间

     Runnable scheduledTask= pollScheduledTask(nanoTime) 代表从定时任务队列中拿到小于nanoTime时间的任务, 因为小于初始化到现在的时间, 说明该任务需要执行了

     

    跟到其父类AbstractScheduledEventExecutorpollScheduledTask(nanoTime)方法中:

    protected final Runnable pollScheduledTask(long nanoTime) {
        assert inEventLoop();
        //拿到定时任务队列
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        //peek()方法拿到第一个任务
        ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
        if (scheduledTask == null) {
            return null;
        }
    
        if (scheduledTask.deadlineNanos() <= nanoTime) {
            //从队列中删除
            scheduledTaskQueue.remove();
            //返回该任务
            return scheduledTask;
        }
        return null;
    }

    我们看到首先获得当前类绑定的定时任务队列的成员变量

    如果不为空, 则通过scheduledTaskQueue.peek()弹出第一个任务

    如果当前任务小于传来的时间, 说明该任务需要执行, 则从定时任务队列中删除

    我们继续回到fetchFromScheduledTaskQueue()方法中:

    private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        //从定时任务队列中抓取第一个定时任务
        //寻找截止时间为nanoTime的任务
        Runnable scheduledTask  = pollScheduledTask(nanoTime);
        //如果该定时任务队列不为空, 则塞到普通任务队列里面
        while (scheduledTask != null) {
            //如果添加到普通任务队列过程中失败
            if (!taskQueue.offer(scheduledTask)) {
                //则重新添加到定时任务队列中
                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
            //继续从定时任务队列中拉取任务
            //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
            scheduledTask = pollScheduledTask(nanoTime);
        }
        return true;
    }

    弹出需要执行的定时任务之后, 我们通过taskQueue.offer(scheduledTask)添加到taskQueue, 如果添加失败, 则通过scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定时任务队列中

     

    如果添加成功, 则通过pollScheduledTask(nanoTime)方法继续添加, 直到没有需要执行的任务

    这样就将定时任务队列需要执行的任务添加到了taskQueue

     

    回到runAllTasks(long timeoutNanos)方法中:

    protected boolean runAllTasks(long timeoutNanos) {
        //定时任务队列中聚合任务
        fetchFromScheduledTaskQueue();
        //从普通taskQ里面拿一个任务
        Runnable task = pollTask();
        //task为空, 则直接返回
        if (task == null) {
            //跑完所有的任务执行收尾的操作
            afterRunningAllTasks();
            return false;
        }
        //如果队列不为空
        //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        //执行每一个任务
        for (;;) {
            safeExecute(task);
            //标记当前跑完的任务
            runTasks ++;
            //当跑完64个任务的时候, 会计算一下当前时间
            if ((runTasks & 0x3F) == 0) {
                //定时任务初始化到当前的时间
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                //如果超过截止时间则不执行(nanoTime()是耗时的)
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            //如果没有超过这个时间, 则继续从普通任务队列拿任务
            task = pollTask();
            //直到没有任务执行
            if (task == null) {
                //记录下最后执行时间
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        //收尾工作
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

    首先通过 Runnable task = pollTask() 从taskQueue中拿一个任务

    任务不为空, 则通过 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 计算一个截止时间, 任务的执行时间不能超过这个时间

    然后在for循环中通过safeExecute(task)执行task

    我们跟到safeExecute(task):

    protected static void safeExecute(Runnable task) {
        try {
            //直接调用run()方法执行
            task.run();
        } catch (Throwable t) {
            //发生异常不终止
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }

    这里直接调用taskrun()方法进行执行, 其中发生异常, 只打印一条日志, 代表发生异常不终止, 继续往下执行

     

    回到runAllTasks(long timeoutNanos)方法:

    protected boolean runAllTasks(long timeoutNanos) {
        //定时任务队列中聚合任务
        fetchFromScheduledTaskQueue();
        //从普通taskQ里面拿一个任务
        Runnable task = pollTask();
        //task为空, 则直接返回
        if (task == null) {
            //跑完所有的任务执行收尾的操作
            afterRunningAllTasks();
            return false;
        }
        //如果队列不为空
        //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        //执行每一个任务
        for (;;) {
            safeExecute(task);
            //标记当前跑完的任务
            runTasks ++;
            //当跑完64个任务的时候, 会计算一下当前时间
            if ((runTasks & 0x3F) == 0) {
                //定时任务初始化到当前的时间
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                //如果超过截止时间则不执行(nanoTime()是耗时的)
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            //如果没有超过这个时间, 则继续从普通任务队列拿任务
            task = pollTask();
            //直到没有任务执行
            if (task == null) {
                //记录下最后执行时间
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        //收尾工作
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

    每次执行完task, runTasks自增

     

    这里 if ((runTasks & 0x3F) == 0) 代表是否执行了64个任务, 如果执行了64个任务, 则会通过 lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到现在的时间, 如果这个时间超过了截止时间, 则退出循环

     

    如果没有超过截止时间, 则通过 task = pollTask() 继续弹出任务执行

    这里执行64个任务统计一次时间, 而不是每次执行任务都统计, 主要原因是因为获取系统时间是个比较耗时的操作, 这里是netty的一种优化方式

    如果没有task需要执行, 则通过afterRunningAllTasks()做收尾工作, 最后记录下最后的执行时间

    以上就是有关执行任务队列的相关逻辑

     

    第二章总结

            本章学习了有关NioEventLoopGroup的创建, NioEventLoop的创建和启动, 以及多路复用器的轮询处理和task执行的相关逻辑, 通过本章学习, 我们应该掌握如下内容:

            1.  NioEventLoopGroup如何选择分配NioEventLoop

            2.  NioEventLoop如何开启

            3.  NioEventLoop如何进行select操作

            4.  NioEventLoop如何执行task

     

     

    上一节: 处理IO事件

    下一节: 初始化NioSocketChannelConfig

     

  • 相关阅读:
    maya软件切换编辑模式FX,Rigging,Animation,Rendering,customize
    关于ueditor1.4.2 与Jquery 验证同时使用失效
    架构师的第二阶段:做(Conceptual-Architecture)
    架构师的第一阶段:准备做(Pre-Architecture)
    新生架构师的首个任务--分阶段!
    新生架构师的困惑
    在Myeclipse上安装hadoop插件,开发MapReduce程序(本人新手,欢迎大家多多指导和关照)
    Hadoop伪分布式搭建(本人新手,欢迎大家多多指导和关照)
    Linux虚拟机的静态网络配置(本人新手,欢迎大家多多指导和关照)
    VMware上安装CentOS系统(本人新手,欢迎大家多多指导和关照)
  • 原文地址:https://www.cnblogs.com/xiangnan6122/p/10203169.html
Copyright © 2011-2022 走看看