zoukankan      html  css  js  c++  java
  • [编织消息框架][netty源码分析]3 EventLoop 实现类SingleThreadEventLoop职责与实现

    eventLoop是基于事件系统机制,主要技术由线程池同队列组成,是由生产/消费者模型设计,那么先搞清楚谁是生产者,消费者内容

    SingleThreadEventLoop 实现

    public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
        private final Queue<Runnable> tailTasks;
    
        @Override
        protected void afterRunningAllTasks() {
            runAllTasksFrom(tailTasks);
        }
    }

    SingleThreadEventLoop是个抽象类,从实现代码上看出很简单的逻辑边界判断

    SingleThreadEventExecutor也是个抽象类,代码量比较大,我们先看重要的成员属性

    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
        //事件队列
        private final Queue<Runnable> taskQueue;
        //执行事件线程,可以看出只有一个线程只要用来记录executor的当前线程
        private volatile Thread thread;
        //主要负责监控该线程的生命周期,提取出当前线程然后用thread记录
        private final Executor executor;
        //用Atomic*技术记录当前线程状态
        private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
                    AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
    }
    
    //启动线程做了比较判断
    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }
    
    private void doStartThread() {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                //记录当前执行线程
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
    
                boolean success = false;
                updateLastExecutionTime();
                try {
                    //这里调用的是子类,注意子类是死循环不停的执行任务
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    //更改线程结束状态 省略部分代码
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }
                    try {
                        // 执行未完成任务同 shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            //最后清理操作,如 NioEventLoop实现 selector.close();
                            cleanup();
                        } finally {
                            //省略部分代码
                        }
                    }
                }
            }
        });
    }

    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { Runnable task = pollTaskFrom(taskQueue); if (task == null) { return false; } for (;;) { //安全执行任务 safeExecute(task); //继续执行剩余任务 task = pollTaskFrom(taskQueue); if (task == null) { return true; } } } protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) { for (;;) { Runnable task = taskQueue.poll(); //忽略WAKEUP_TASK类型任务 if (task == WAKEUP_TASK) { continue; } return task; } } protected boolean runAllTasks(long timeoutNanos) { //先执行周期任务 fetchFromScheduledTaskQueue(); //从taskQueue提一个任务,如果为空执行所有tailTasks Runnable task = pollTask(); //如果taskQueue没有任务,立即执行子类的tailTasks if (task == null) { afterRunningAllTasks(); return false; } //计算出超时时间 = 当前 nanoTime + timeoutNanos final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; //当执行任务次数大于64判断是否超时,防止长时间独占CPU if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
    //SingleThreadEventLoop run 实现
    public class DefaultEventLoop extends SingleThreadEventLoop {
    
        @Override
        protected void run() {
            for (;;) {
                Runnable task = takeTask();
                if (task != null) {
                    task.run();
                    updateLastExecutionTime();
                }
    
                if (confirmShutdown()) {
                    break;
                }
            }
        }
    }

    我们可以在SingleThreadEventExecutor  两个runAllTasks 方法打上断点,看执行任务时调用逻辑

     本人为了搞清楚 taskQueue 同tailTasks 类型任务,在任务入队时打断点,分别为 SingleThreadEventLoop executeAfterEventLoopIteration方法同 SingleThreadEventExecutor offerTask方法

    ServerBootstrap[bind address] ->

    NioEventLoopGroup [register Channel] ->  [ChannelPromise] ->

    NioEventLoop [build and push register task]

    从调用链可以清晰看出,启动 netty server 绑定生成抽象 Channel 然后l转换成ChannelPromise,再调用注册实现register0

    这里用了判断是否为当前线程,如果是不用加入队列马上执行,目前减少上下文切换开削

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    }

    总结:

    1.SingleThreadEventLoop 任务执行加了超时限制,目的防止当前线程长时间执行任务独占cpu

    2.提交任务时做了减少上下文开削优化

    3.执行任务优先级 1.周期任务 2.taskQueue 3.tailTasks

    目前没有看到任何调用 SingleThreadEventLoop executeAfterEventLoopIteration 方法,估计是扩展处理。

    4.用到Atomic*技术解决并发问题,从Executor提取当前线程,把单一线程维护交给Executor 

  • 相关阅读:
    nginx负载均衡实现
    shiro 退出 清除缓存
    从零到实现Shiro中Authorization和Authentication的缓存
    Mysql 语句
    N! java
    大数java(pow)
    HDU_1548
    Mike and strings 798B
    Array Division 808D
    poj_1979(dfs)
  • 原文地址:https://www.cnblogs.com/solq111/p/6913788.html
Copyright © 2011-2022 走看看