zoukankan      html  css  js  c++  java
  • 小学徒进阶系列—揭开ThreadPoolExecutor神秘的面纱

      前提摘要:本文是基于jdk1.7的,在分析ThreadPoolExecutor代码的过程中百度时发现1.6和1.7的实现还是有一定的区别的而且还挺大的,个人感觉1.6比较简单好理解。

      为了方便大家阅读理解,我把说明以注释的形式嵌入到了代码中。

      关于线程池,它不仅有效的复用了对象,更有效的复用了线程,减少了线程创建,销毁,恢复等状态切换的开销,提高了程序的性能。但是,究竟线程池是怎么复用对象的呢?它又是怎样去复用线程减少开销的呢?下面我们来一一揭开,ThreadPoolExecutor神秘的面纱。

     1.基本变量和方法

      为了能够更好的进行分析,我们先来做一些热身活动,了解下线程池的几个重要的变量吧。

      1.首先大家最好先了解下原子变量的概念,具体可以参考官网文档:http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/atomic/package-summary.html

      2.在这里,我们先讲讲两个会贯穿全文的单词:1> workerCount:当前活动的线程数;2> runState:线程池的当前状态

      下面我们开始分析吧。

     1.1基本变量和方法

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

      这个是用一个int来表示workerCount和runState的,其中runState占int的高3位,其它29位为workerCount的值。
      用AtomicInteger是因为其在并发下使用compareAndSet效率非常高;
      当改变当前活动的线程数时只对低29位操作,如每次加一减一,workerCount的值变了,但不会影响高3位的runState的值。

      当改变当前状态的时候,只对高3位操作,不会改变低29位的计数值。
      这里有一个假设,就是当前活动的线程数不会超过29位能表示的值,即不会超过536870911,
      就目前以及可预见的很长一段时间来讲,这个值是足够用了。同时按照源代码中注释提供的说法,一旦未来超过了AtomicInteger承受的范围,变量类型到时候可以替换为AtomicLong类型。

     ------------------------------------------------------------------------我是神奇的分割线-----------------------------------------------------------------------------------

    private static final int COUNT_BITS = Integer.SIZE - 3;

      首先,Integer.SIZE的值为32,他减去3以后,值就为29

      COUNT_BITS,就是用来表示workerCount占用一个int的位数,其值为前面说的29

      ------------------------------------------------------------------------ 我是神奇的分割线 -----------------------------------------------------------------------------------

    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

      1右移以为后二进制的表示是00100000000000000000000000000000,减去1之后的值就是00011111111111111111111111111111(占29位,29个1)

      CAPACITY为29位能表示的最大容量,即workerCount实际能用的最大值(536870911)

     1.2线程池的状态

      接下来的几个变来那个描述的是关于线程池的状态,分别是:

      1> RUNNING : 该状态下线程池能接受新任务,并且可以运行队列中的任务

             -1的二进制为32个1,移位后为:11100000000000000000000000000000

    private static final int RUNNING    = -1 << COUNT_BITS;

      2> SHUTDOWN : 该状态下的线程池不再接受新任务,但仍可以执行队列中的任务

                0的二进制为32个0,移位后还是全0(00000000000000000000000000000000)

    private static final int SHUTDOWN   =  0 << COUNT_BITS;

      3> STOP : 该状态下的线程池不再接受新任务不再执行队列中的任务,而且要中断正在处理的任务
            1的二进制为前面31个0,最后一个1,移位后为:00100000000000000000000000000000

    private static final int STOP       =  1 << COUNT_BITS;

      

      4>TIDYING : 该状态下的线程池所有任务均已终止,workerCount的值为0,转到TIDYING状态的线程即将要执行terminated()方法.
             2的二进制为00000000000000000000000000000010 移位后01000000000000000000000000000000

    private static final int TIDYING    =  2 << COUNT_BITS;

      

      5>TERMINATED : 该状态下的线程池说明 terminated()方法执行结束.
                 3的二进制为00000000000000000000000000000011,移位后01100000000000000000000000000000

    private static final int TERMINATED =  3 << COUNT_BITS;

       线程池各个状态间的转换:

      1>RUNNING -> SHUTDOWN : 调用了shutdown方法,线程池实现了finalize方法,在里面调用了shutdown方法,因此shutdown可能是在finalize中被隐式调用的

      2>(RUNNING or SHUTDOWN) -> STOP 调用了shutdownNow方法

      3>SHUTDOWN -> TIDYING : 当队列和线程池均为空的时候

      4>STOP -> TIDYING : 当线程池为空的时候

      5>TIDYING -> TERMINATED : terminated()方法调用完毕

     1.3基本方法  

      1> 这个方法用于取出当前活动线程的数量,也就是workerCount的值。

    /**
     * 这个方法用于取出workerCount的值
     * 因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了
     * 保留参数的低29位,也就是workerCount的值
     * @param c ctl, 存储runState和workerCount的int值
     * @return workerCount的值
     */
    private static int workerCountOf(int c)  { return c & CAPACITY; }

       

      2> 这个方法用于取出当前线程池的运行状态,也就是runState的值

    /**
     * 这个方法用于取出runState的值
     * 因为CAPACITY值为:00011111111111111111111111111111
     * ~为按位取反操作,则~CAPACITY值为:11100000000000000000000000000000
     * 再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值
     * @param c 该参数为存储runState和workerCount的int值
     * @return runState的值
     */
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

       

      3> 这个方法将runState和workerCount的值通过或运算存到同一个int中

    /**
     * 将runState和workerCount存到同一个int中
     * “|”运算的意思是,假设rs的值是101000,wc的值是000111,则他们位或运算的值为101111
     * @param rs runState移位过后的值,负责填充返回值的高3位
     * @param wc workerCount移位过后的值,负责填充返回值的低29位
     * @return 两者或运算过后的值
     */
    private static int ctlOf(int rs, int wc) { return rs | wc; }

     2.关键方法的分析

      下面我们根据前面我们写的一个线程池的方法进行测试吧,此处只是摘取一点列取出来,详细完整代码请见《小学徒成长系列—线程同步、死锁、线程池》:

    executorService.execute(new TaskThread());    //创建任务并交给线程池进行管理

      1> 上面的入口是execute,那么我们就从execute()方法开始进行分析吧,为了方便大家阅读,我都以注释的形式直接写在代码上,或许这个过程比较枯燥,但是只要你坚持下去,一定会受益匪浅。

     1  public void execute(Runnable command) {
     2         //任务为null,则抛出异常
     3         if (command == null)
     4             throw new NullPointerException();
     5 
     6         int c = ctl.get();    //取出记录着runState和workerCount 的 ctl的当前值
     7         
     8         //通过workerCountOf方法从ctl所表示的int值中提取出低29位的值,也就是当前活动的线程数
     9         //如果(当前活动的线程 < corePoolSize)
    10         if (workerCountOf(c) < corePoolSize) {    
    11             //创建新的线程
    12             //对于该函数形参,command就是请求任务,
    13             //而true表示需要检测当前运行的线程是否小于corePoolSize
    14             //false表示需要检测当前运行的线程数量是否小于maxPoolSize
    15             if (addWorker(command, true))    
    16                 return;    //创建线程成功,则停止该终止该方法的执行
    17             c = ctl.get();    //如果添加失败,则取出记录着runState和workerCount 的 ctl的当前值
    18         }
    19         //当前线程池处于运行状态且队列未满 && 如果线程正在运行中并且任务添加到缓冲队列成功
    20         if (isRunning(c) && workQueue.offer(command)) {
    21             int recheck = ctl.get();    //再次获取用于下面再次检查
    22             if (! isRunning(recheck) && remove(command))    //如果线程池已经处于非运行状态,则从缓冲队列中移除任务并拒绝
    23                 reject(command);    //采用线程池指定的策略拒绝任务
    24             else if (workerCountOf(recheck) == 0)    //如果线程池处于运行状态 或者线程池已经处于非运行状态但是任务移除失败
    25                 addWorker(null, false);
    26         }
    27         //    1. 当前线程池并不处于Running状态
    28         //    2. 当前线程池处于Running状态,但是缓冲队列已经满了
    29         else if (!addWorker(command, false))
    30             reject(command); //采用线程池指定的策略拒绝任务
    31   }

      关于上面的execute(Runnable command),大部分的解释都在代码的注释中啦。

      或许大家会有疑问:上面已经有了判断当前活动的线程小于corePoolSize了,那么等于和大于corePoolSize怎么处理呢?

      解答:不知道大家有没有注意到,当当前活动的线程数量 >= corePoolSize 的时候,都是优先添加到队列中,直到队列满了才会去创建新的线程,在这里第20行的if语句已经体现出来了。这里利用了&&的特性,只有当第一个条件会真时才会去判断第二个条件,第一个条件是isRunning(),判断线程池是否处于RUNNING状态,因为只有在这个状态下才会接受新任务,否则就拒绝,如果正处于RUNNING状态,那么就加入队列,如果加入失败可能就是队列已经满了,这时候直接执行第29行。

      2> 在execute()方法中,当 当前活动的线程数量 < corePoolSize 时,会执行addWorker()方法,关于addWorker(),它是用来直接新建线程用的,之所以叫addWorker而不是addThread是因为在线程池中,所有的线程都用一个Worker对象包装着,好吧,我们先来看看这个方法。

     1    /**
     2     * 创建并执行新线程
     3     * @param firstTack 用于指定新增的线程执行的第一个任务
     4     *
     5     * @param core      true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,
     6     *                  false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
     7     *
     8     * @return 是否成功新增一个线程
     9     */
    10    private boolean addWorker(Runnable firstTask, boolean core) {
    11         retry:
    12         for (;;) {
    13             int c = ctl.get();    //获取记录着runState和workCount的int变量的当前值
    14             int rs = runStateOf(c);    //获取当前线程池运行的状态
    15 
    16             //if语句中的条件转换成一个等价实现:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
    17             /*
    18               这个条件代表着以下几个情景,就直接返回false说明线程创建失败:
    19               1.rs > SHUTDOWN; 此时不再接收新任务,且所有的任务已经执行完毕
    20               2.rs = SHUTDOWN; 此时不再接收新任务,但是会执行队列中的任务,在后买年的或语句中,第一个不成立,firstTask != null成立
    21               3.rs = SHUTDOWN;此时不再接收新任务,fistTask == null,任务队列workQueue已经空了
    22             */
    23             if (rs >= SHUTDOWN &&
    24                 ! (rs == SHUTDOWN &&
    25                    firstTask == null &&
    26                    ! workQueue.isEmpty()))
    27                 return false;
    28 
    29             for (;;) {
    30                 //获取当前活动的线程数
    31                 int wc = workerCountOf(c);    
    32                 //先判断当前活动的线程数是否大于最大值,如果超过了就直接返回false说明线程创建失败
    33                 //如果没有超过再根据core的值再进行以下判断
    34                 /*
    35                     1.core为true,则判断当前活动的线程数是否大于corePoolSize
    36                     2.core为false,则判断当前活动线程数是否大于maximumPoolSize
    37                 */
    38                 if (wc >= CAPACITY ||
    39                     wc >= (core ? corePoolSize : maximumPoolSize))
    40                     return false;
    41                 //比较当前值是否和c相同,如果相同,则改为c+1,并且跳出大循环,直接执行Worker进行线程创建
    42                 if (compareAndIncrementWorkerCount(c))
    43                     break retry;
    44                 c = ctl.get();  // 获取ctl的当前值
    45                 if (runStateOf(c) != rs)    //检查下当前线程池的状态是否已经发生改变
    46                     continue retry;    //如果已经改变了,则进行外层retry大循环,否则只进行内层的循环
    47                 // else CAS failed due to workerCount change; retry inner loop
    48             }
    49         }
    50         //下面这里就是开始创建新的线程了
    51         //Worker的也是Runnable的实现类
    52         Worker w = new Worker(firstTask);
    53         //因为不可以直接在Worker的构造方法中进行线程创建
    54         //所以要把它的引用赋给t方便后面进行线程创建
    55         Thread t = w.thread;    
    56 
    57         final ReentrantLock mainLock = this.mainLock;
    58         mainLock.lock();
    59         try {
    60           
    61             //再次取出ctl的当前值,用于进行状态的检查,防止线程池的已经状态改变了
    62             int c = ctl.get();
    63             int rs = runStateOf(c);
    64 
    65             //将if语句中的条件转换为一个等价实现 :t == null || (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null))
    66             //有个t == null是因为如果使用的是默认的ThreadFactory的话,那么它的newThread()可能会返回null
    67             /*
    68               1. 如果t == null, 则减少一个线程数,如果线程池处于的状态 > SHUTDOWN,则尝试终止线程池
    69               2. 如果t != null,且rs == SHUTDOWN,则不再接收新任务,若firstTask != null,则此时也是返回false,创建线程失败
    70               3. 如果t != null, 且rs > SHUTDOWN,同样不再接受新任务,此时也是返回false,创建线程失败
    71             */
    72             if (t == null ||
    73                 (rs >= SHUTDOWN &&
    74                  ! (rs == SHUTDOWN &&
    75                     firstTask == null))) {
    76                 decrementWorkerCount();    //减少一个活动的当前线程数
    77                 tryTerminate();    //尝试终止线程池
    78                 return false;    //返回线程创建失败
    79             }
    80 
    81             workers.add(w);    //将创建的线程添加到workers容器中
    82 
    83             int s = workers.size();    //获取当前线程活动的数量
    84             if (s > largestPoolSize)    //判断当前线程活动的数量是否超过线程池最大的线程数量
    85                 largestPoolSize = s;    //当池中的工作线程创新高时,会将这个数记录到largestPoolSize字段中。然后就可以启动这个线程t了
    86         } finally {
    87             mainLock.unlock();
    88         }
    89 
    90         t.start();    //开启线程
    91         //若start后,状态又变成了SHUTDOWN状态(如调用了shutdownNow方法)且新建的线程没有被中断过,
    92         //就要中断该线程(shutdownNow方法要求中断正在执行的线程),
    93         //shutdownNow方法本身也会去中断存储在workers中的所有线程
    94         if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
    95             t.interrupt();
    96 
    97         return true;
    98     }

      那么在创建线程的时候,线程执行的是什么的呢?

      我们前面提到Worker继承的其实也是Runnable,它在创建线程的时候是以自身作为任务传进先创建的线程中的,这段比较简单,我就不一一注释了,只是给出源代码给大家看吧。

          Worker(Runnable firstTask) {
                this.firstTask = firstTask;
                //this指的是worker对象本身
                this.thread = getThreadFactory().newThread(this);
            }

       它以自身的对象作为线程任务传进去,那么它的run方法又是怎样的呢?

     public void run() {
                runWorker(this);
            }

       竟然只有一句话调用runWorker()方法,这个可是重头戏,我们来看看,究竟运行的是什么。

     1 /**
     2  * 执行Worker中的任务,它的执行流程是这样的:
     3  * 若存在第一个任务,则先执行第一个任务,否则,从队列中拿任务,不断的执行,
     4  * 直到getTask()返回null或执行任务出错(中断或任务本身抛出异常),就退出while循环。
     5  * @param w woker
     6  */
     7  final void runWorker(Worker w) {
     8         Runnable task = w.firstTask;    //将当前Worker中的任务取出来交给task,并释放掉w.firstTask占用的内存
     9         w.firstTask = null;
    10         //用于判断线程是否由于异常终止,如果不是异常终止,在后面将会将该变量的值改为false
    11         //该变量的值在processWorkerExit()会使用来判断线程是否由于异常终止
    12         boolean completedAbruptly = true;    
    13         try {
    14             //执行任务,直到getTask()返回的值为null,在此处就相当于复用了线程,让线程执行了多个任务
    15             while (task != null || (task = getTask()) != null) {    
    16                 w.lock();
    17                 clearInterruptsForTaskRun();//对线程池状态进行一次判断,后面我们会讲解一下该方法
    18                 try {
    19                     beforeExecute(w.thread, task);    //在任务执行前需要做的逻辑方法,该方面可以由用户进行重写自定义
    20                     Throwable thrown = null;
    21                     try {
    22                         task.run();    //开始执行任务
    23                     } catch (RuntimeException x) {
    24                         thrown = x; throw x;
    25                     } catch (Error x) {
    26                         thrown = x; throw x;
    27                     } catch (Throwable x) {
    28                         thrown = x; throw new Error(x);
    29                     } finally {
    30                         afterExecute(task, thrown);    //在任务执行后需要做的逻辑方法,该方面可以由用户进行重写自定义
    31                     }
    32                 } finally {
    33                     task = null;    
    34                     w.completedTasks++;    //增加该线程完成的任务
    35                     w.unlock();
    36                 }
    37             }
    38             completedAbruptly = false;    //线程不是异常终止
    39         } finally {
    40             processWorkerExit(w, completedAbruptly);    //结束该线程
    41         }
    42     }

     下面就是线程在执行任务之前对线程池状态的一次判断:

     1     /**
     2      * 对线程的结束做一些清理和数据同步
     3      * @param w 封装线程的Worker
     4      * @param completedAbruptly 表示该线程是否结束于异常
     5      */
     6     private void processWorkerExit(Worker w, boolean completedAbruptly) {
     7         // 如果completedAbruptly值为true,则说明线程是结束于异常
     8         //如果不是结束于异常,那么它降在runWorker方法的while循环中的getTask()方法中已经减一了
     9         if (completedAbruptly) 
    10             decrementWorkerCount();    //此时将线程数量减一
    11 
    12         final ReentrantLock mainLock = this.mainLock;
    13         mainLock.lock();
    14         try {
    15             completedTaskCount += w.completedTasks;    //统计总共完成的任务数
    16             workers.remove(w);    //将该线程数从workers容器中移除
    17         } finally {
    18             mainLock.unlock();
    19         }
    20 
    21         tryTerminate();    //尝试终止线程池
    22 
    23         int c = ctl.get();
    24         //接下来的这个if块要做的事儿了。当池的状态还是RUNNING,
    25         //又要分两种情况,一种是异常结束,一种是正常结束。异常结束比较好弄,直接加个线程替换死掉的线程就好了,
    26         //也就是最后的addWorker操作
    27         if (runStateLessThan(c, STOP)) {    //如果当前运行状态为RUNNING,SHUTDOWN
    28             if (!completedAbruptly) {    //如果线程不是结束于异常
    29                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;    //是否允许线程超时结束
    30                 if (min == 0 && ! workQueue.isEmpty())    //如果允许把那个且队列不为空
    31                     min = 1;    //至少要保留一个线程来完成任务
    32                 //如果当前活动的线程数大于等于最小的值
    33                 // 1.不允许核心线程超时结束,则必须要使得活动线程数超过corePoolSize数才可以
    34                 // 2. 允许核心线程超时结束,但是队列中有任务,必须留至少一个线程
    35                 if (workerCountOf(c) >= min)    
    36                     return; // replacement not needed
    37             }
    38             //直接加个线程
    39             addWorker(null, false);    
    40         }
    41     }

     前面我们的方法遇见过很多次tryTerminate()方法,到底他是怎样尝试结束线程池的呢?

     1     /**
     2      * 执行该方法,根据线程池状态进行 
     3      * 判断是否结束线程池
     4      */
     5      final void tryTerminate() {
     6         for (;;) {
     7             int c = ctl.get();
     8             if (isRunning(c) ||    //线程池正在运行中,自然不能结束线程池啦
     9                 runStateAtLeast(c, TIDYING) ||    //如果状态为TIDYING或TERMINATED,池中的活动线程数已经是0,自然也不需要做什么操作了
    10                 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))    //线程池出于SHUTDOWN状态,但是任务队列不为空,自然不能结束线程池啦
    11                 return;
    12             if (workerCountOf(c) != 0) { // Eligible to terminate
    13                 /*
    14                   调用这个方法的目的是将shutdown信号传播给其它线程。
    15                   调用shutdown方法的时候会去中断所有空闲线程,如果这时候池中所有的线程都正在执行任务,
    16                   那么就不会有线程被中断,调用shutdown方法只是设置了线程池的状态为SHUTDOWN,
    17                   在取任务(getTask,后面会细说)的时候,假如很多线程都发现队列里还有任务(没有使用锁,存在竞态条件),
    18                   然后都去调用take,如果任务数小于池中的线程数,那么必然有方法调用take后会一直等待(shutdown的时候这些线程正在执行任务,
    19                   所以没能调用它的interrupt,其中断状态没有被设置),那么在没有任务且线程池的状态为SHUTDWON的时候,
    20                   这些等待中的空闲线程就需要被终止iinterruptIdleWorkers(ONLY_ONE)回去中断一个线程,让其从take中退出,
    21                   然后这个线程也进入同样的逻辑,去终止一个其它空闲线程,直到池中的活动线程数为0。
    22                 */
    23                 interruptIdleWorkers(ONLY_ONE);
    24                 return;
    25             }
    26 
    27             final ReentrantLock mainLock = this.mainLock;
    28             mainLock.lock();
    29             try {
    30                 /*
    31                   当状态为SHUTDOWN,且活动线程数为0的时候,就可以进入TIDYING状态了,
    32                   进入TIDYING状态就可以执行方法terminated(),
    33                   该方法执行结束就进入了TERMINATED状态(参考前文中各状态的含义以及可能的状态转变)
    34                 */
    35                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    36                     try {
    37                         terminated();    //执行该方法,结束线程池
    38                     } finally {
    39                         ctl.set(ctlOf(TERMINATED, 0));
    40                         /*
    41                           当线程池shutdown后,外部可能还有很多线程在等待线程池真正结束,
    42                           即调用了awaitTermination方法,该方法中,外部线程就是在termination上await的,
    43                           所以,线程池关闭之前要唤醒这些等待的线程,告诉它们线程池关闭结束了。
    44                         */
    45                         termination.signalAll();
    46                     }
    47                     return;
    48                 }
    49             } finally {
    50                 mainLock.unlock();
    51             }
    52             // else retry on failed CAS
    53         }
    54     }
  • 相关阅读:
    Visual Studio 2010使用Visual Assist X的方法
    SQL Server 2000 评估版 升级到 SQL Server 2000 零售版
    双网卡多网络单主机同时访问
    开发即过程!立此纪念一个IT新名词的诞生
    delphi dxBarManager1 目录遍历 转为RzCheckTree2树
    5320 软件集合
    delphi tree 从一个表复制到另一个表
    DELPHI 排课系统课表
    长沙金思维 出现在GOOGLE的 金思维 相关搜索里啦!!
    如何在DBGrid的每一行前加一个单选框?
  • 原文地址:https://www.cnblogs.com/xiaoxuetu/p/3070344.html
Copyright © 2011-2022 走看看