zoukankan      html  css  js  c++  java
  • 线程池 ThreadPoolExecutor 源码整理

    1):线程池中几个成员变量的说明。

    ctl:记录了"线程池中的任务数量"和"线程池状态"2个信息,包括32位。其中,高3位表示"线程池状态",低29位表示"线程池中的任务数量"。

    在 ThreadPoolExecutor 类中的定义如下:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

     线程池中的运行状态的存储如下:

       private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;

    RUNNING    -- 对应的高3位值是111。

    SHUTDOWN   -- 对应的高3位值是000。

    STOP       -- 对应的高3位值是001。

    TIDYING    -- 对应的高3位值是010。

    TERMINATED -- 对应的高3位值是011。 

    2):接下来分析类中代码的具体逻辑。线程池的中方法的执行入口是 execute()

     下面是 execute(Runnable command) 执行过程:

       1:判断 command 任务是否为空,任务为空则抛出异常

        if (command == null)
             throw new NullPointerException();

      2 :command 任务不为空时:

            int c = ctl.get(); // 高3位为线程运行的状态  低29位:线程池中运行线程的数量
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }        

      3:若满足线程池中运行的线程数量小于核心池数,则进入 addWorker() 方法:这是一个自旋的方式来实现逻辑

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {  // 进入自旋
                int c = ctl.get();   
                int rs = runStateOf(c); // 获取线程池的运行状态  如 上述的 running shutdown stop 等
                //......
                for (;;) { // 进入自旋
                    int wc = workerCountOf(c);  // 获取线程池中线程数
                    if (wc >= CAPACITY ||        //判断 线程池中线程数大于等于最大整数 或者大于等于核心池数则return false
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c)) //cas的方式将ctl中的表示线程数的值加 1
                        break retry;       //cas操作成功后 退出最外层的自旋
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                   
                }
            }                

            4:接下来将任务封装成一个 Worker 对象,绕后开启线程执行

                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                            //......
                            workers.add(w);   //将worker 对象添加到set中
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();  // 开始执行线程的run方法
                        workerStarted = true;
                    }
                }  

      5:下面看看 Worker 类中的run() 方法。

         public void run() {
                runWorker(this);
            }
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            //.....
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    //.....
                    try {
                        //......
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            // 开始执行task任务
                            task.run();
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } 
        }            

    从以上的逻辑可以看出run() 方法首先执行的是构造器传进来的任务  w.firstTask 如果这个任务为空,则从队列里获取任务然后执行 task = getTask()

     下面看看 getTask() 的核心实现方式,主要是通过poll的方式从queue中获取任务,是一个生产消费的模式。这里是消费方

           try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }

            6:下面看看 线程池中的线程数大于等于核心池数:这里的主要逻辑是:判断线程池的状态是否运行,如果运行则将任务添加到队列中,这里是生产和上面分析的消费相关

         if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }

       7:如果添加到队列中失败,这时会将任务添加到线程中,线程池新建一个线程来执行这个任务,如果添加任务失败则拒绝这个任务。

    else if (!addWorker(command, false))
                reject(command);

    到这里为止,任务的添加和执行过程已经分析完成

  • 相关阅读:
    实例讲解Springboot以Repository方式整合Redis
    Spark join 源码跟读记录
    数理统计与参数估计杂记
    常见的距离算法和相似度(相关系数)计算方法
    生成模型(Generative Model)与判别模型(Discriminative Model)
    Java 积累复习用
    RangePartitioner 实现简记
    Spark常见问题汇总
    2016年终总结
    学习资料库
  • 原文地址:https://www.cnblogs.com/beppezhang/p/11214702.html
Copyright © 2011-2022 走看看