zoukankan      html  css  js  c++  java
  • JAVA8线程池源码解析

    1.对线程池的理解

    1.1 艰辛摸索

    看过许多关于线程池的介绍和讲解,看过方腾飞的 《并发编程的艺术》 也看过很多博客关于线程池的讲解,但是总觉得自己理解的不太好,总觉得哪里缺点。后来自己也花时间查看源码,自己去琢磨,多问为什么,结合一些博客的讲解。现在我想把这段时间的成果记录下来,也是为了加深自己的理解吧。
    下面所有的源码都是java8 源码。

    1.2 线程池的好处

    • 降低性能消耗、提高响应速度:对于应用需要频繁创建线程,而且线程任务都比较简单,比如一些IO任务,线程的生命周期都很短;而线程的创建需要花费一定的CPU时间,所以当任务到来时如果线程已经准备就绪了,而不是重新创建,则会大大提高系统的响应速度。
    • 对线程的集中管理监控:将创建的线程规约在线程池里,则可以对线程的数量和运行状态进行管理并进行监控,可对系统的线程资源进行集中管理。

    2.线程池内的一些属性

    2.1 线程池参数

    看下面的这个线程池的构造器,他有许多参数,这些参数都代表什么意思?

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    
    • corePoolSize 核心的线程池数量
    • maximumPoolSize 线程池内最大的线程数量
    • keepAliveTime 线程存活时间
    • unit 时间单位 比如 秒 分钟
    • workQueue 存储任务的阻塞队列
    ArrayBlockingQueue 有界阻塞队列,此队列会满。
    SynchronousQueue 没有容量的阻塞队列,一次添加必须等待一次获取。反之亦然。
    LinkedBlockingQueue 无界阻塞队列 这个阻塞队列用于不会满
    
    • threadFactory 自定义线程工厂
    
    1.通过自定义线程池 可以给线程池内,线程都赋予更有意义的线程名。
    也可以根据喜好做些别的事情,比如记录一下何时创建线程之类的。
    2.如果没有则会采用默认的线程池。Executors.defaultThreadFactory()
    
    • handler 拒绝策略
    拒绝策略java 默认提供了四种实现。都是ThreadPoolExecutor的内部类。
    1 CallerRunsPolicy :如果当前线程池 处于运行状态 ,直接使用当前线程执行任务,如果是终止状态,则悄悄的直接抛弃。
    

    2. AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。默认采用。
    rejectedExecution 内直接抛异常。
    3. DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。查看源码发现他的rejectedExecution 函数就是一个空实现。

    4. DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个任务,然后把拒绝任务加到队列。查看源码
    if (!e.isShutdown()) {
    e.getQueue().poll();//获取并移除队列head
    e.execute(r);//再次execute任务
    }

    3.Executors 提供的几种线程池

    3.1 newFixedThreadPool

    • 固定线程数的线程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
       new LinkedBlockingQueue<Runnable>());
    }
    1.核心 线程数 和 最大线程数一致。
    2.采用的阻塞队列是 无界阻塞队列,LinkedBlockingQueue。也就是在极端情况下,阻塞队列 会一直增长,直到堆内存溢出,需要谨慎使用该线程池。
    

    3.2 newCachedThreadPool

    • 缓冲线程池
     public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());
    }
    0.在newCachedThreadPool中如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    1.初看该构造函数时我有这样的疑惑:核心线程池为0,那按照前面所讲的线程池策略新任务来临时无法进入核心线程池,只能进入 SynchronousQueue中进行等待
    ,而SynchronousQueue的大小为1,那岂不是第一个任务到达时只能等待在队列中,直到第二个任务到达发现无法进入队列才能创建第一个线程? 
    2.这个问题的答案在上面讲SynchronousQueue时其实已经给出了,要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。因此即便SynchronousQueue一开始为空且大小为1,第一个任务也无法放入其中,因为没有线程在等待从SynchronousQueue中取走元素。因此第一个任务到达时便会创建一个新线程执行该任务。
    3.这个最大线程数是 Integer.MAX 使用中需要注意。
    

    3.3 newWorkStealingPool

    jdk 1.8 新加入的。创建一个带并行级别的线程池,并行级别决定了同一时刻最多有多少个线程在执行,如不穿如并行级别参数,将默认为当前系统的CPU个数

    3.4 newScheduledThreadPool

    创建一个定长线程池,支持定时及周期性任务执行。

    4. 源码

    下面介绍下线程池的源码,由于对写作没有天赋,我就尽我最大努力把源码分析的浅显易懂一些。我介绍下线程池的一些状态属性,然后再从提交一个任务开始跟着源码进行描述一下我对源码的理解。

    4.1 线程池状态 和 线程统计

    image.png
    • ctl
      先看看线程池中最重要的一个属性 ctl,我觉得是Control的简写,ctl控制着整个线程池的运转。ctl是AtomicInteger类型,线程池利用ctl 的高3位作为记录当前线程池的状态。利用低29位记录线程池中线程数,所以线程池中线程的最大容量为 2^29。
      默认值是 1110 0000 0000 0000 0000 0000 0000 0000 = -536 870 912 ;

    • COUNT_BITS
      COUNT_BITS = Integer.SIZE - 3;
      COUNT_BITS 的意思是 一个整型数 有29位用于统计线程池内线程数;

    • RUNNING 运行状态
      RUNNING 是线程池的初始状态,是一个int 类型的常量,值为 -1 左移 29 位即为 -536 870 912,线程池 的状态
      只有RUNNING 是负数的。

    • SHUTDOWN
      RUNNING --shutDowm()--> SHUTDOWN
      RUNNING状态调用shutDowm()函数进入SHUTDOWN状态。是一个int类型的常量 值为0 ;
      此时线程池不接收新任务,但能处理已添加的任务。

    • STOP
      (RUNNING or SHUTDOWN) ---shutdownNow()--> STOP
      RUNNING状态或者SHUTSOWN状态调用shutDowmNow()函数进入SROP状态。是一个int类型的常量 值为536 870 912 ;
      不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

    • TIDYING
      是一个int类型的常量 值为1 073 741 824 ;
      SHUTDOWN -> TIDYING:当线程池内线程数为0并且队列内任务数量为0时
      STOP -> TIDYING:当线程池内线程数量为0时

    • TERMINATED
      是一个int类型的常量 值为1 610 612 736 ;
      线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

    • CAPACITY (线程的最大容量)
      1 转移 29位 ,就是 0010 0000 0000 0000 0000 0000 0000 0000,
      然后 减1 ,就变成了0001 1111 1111 1111 1111 1111 1111 1111,刚好就是 低29位 为1 ,就是线程内线程的最大容量 。

    4.2 execute 执行任务

    线程池添加任务流程图.png

    在未来的某个时刻执行给定的任务,这个任务会被一个新线程或者一个已经存在的线程执行。
    如果任务不能提交执行,那是因为线程池不处于运行状态或者已经到达容量上限。
    那么这个任务就会被RejectedExecutionHandler处理。
    execute()代码逻辑:

    1.当通过excute(Runable) 提交一个任务时,如果当前线程池内线程数量小于corePoolSize 数量,即使线程池内线程处于闲置状态,线程池也会创建一个新的线程。
    2.如果线程内线程数大于corePoolSize 数量小于 maximumPoolSize 数量,则会将任务 提交到 workQueue(阻塞队列)。
    3.如果workQueued队列已经满了,而当前线程池内线程数量小于maximumPoolSize 则创建一个新的线程执行。
    4.如果此时线程池内线程数量 不小于maximumPoolSize 数量,则执行拒绝策略。
    
        /**
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
         */
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
        <span class="token keyword">int</span> c <span class="token operator">=</span> ctl<span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">//1.如果正在运行的线程数 小于 corePoolSize,创建一个线程执行command.</span>
        
        <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token function">workerCountOf</span><span class="token punctuation">(</span>c<span class="token punctuation">)</span> <span class="token operator">&lt;</span> corePoolSize<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token comment">//创建一个线程执行command</span>
            <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token function">addWorker</span><span class="token punctuation">(</span>command<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
                <span class="token keyword">return</span><span class="token punctuation">;</span>
                <span class="token comment">//如果失败 获取ctl recheck runstate</span>
            c <span class="token operator">=</span> ctl<span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token punctuation">}</span>
        <span class="token comment">//2.如果是运行状态 并且追加command 到队列成功</span>
        <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token function">isRunning</span><span class="token punctuation">(</span>c<span class="token punctuation">)</span> <span class="token operator">&amp;&amp;</span> workQueue<span class="token punctuation">.</span><span class="token function">offer</span><span class="token punctuation">(</span>command<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
            <span class="token keyword">int</span> recheck <span class="token operator">=</span> ctl<span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
            <span class="token comment">//重新检查运行状态 不是运行中状态 则 从队列移除 command,让RejectedExecutionHandler 处理command。</span>
            <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token operator">!</span> <span class="token function">isRunning</span><span class="token punctuation">(</span>recheck<span class="token punctuation">)</span> <span class="token operator">&amp;&amp;</span> <span class="token function">remove</span><span class="token punctuation">(</span>command<span class="token punctuation">)</span><span class="token punctuation">)</span>
            
                <span class="token function">reject</span><span class="token punctuation">(</span>command<span class="token punctuation">)</span><span class="token punctuation">;</span>
            <span class="token keyword">else</span> <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token function">workerCountOf</span><span class="token punctuation">(</span>recheck<span class="token punctuation">)</span> <span class="token operator">==</span> <span class="token number">0</span><span class="token punctuation">)</span>
            <span class="token comment">//如果当前运行中的线程数是0 ,则创建一个新线程。</span>
                <span class="token function">addWorker</span><span class="token punctuation">(</span><span class="token keyword">null</span><span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token punctuation">}</span>
        <span class="token comment">//3.如果添加任务到队列失败,则新建一个线程执行任务。</span>
        <span class="token keyword">else</span> <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token operator">!</span><span class="token function">addWorker</span><span class="token punctuation">(</span>command<span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
        <span class="token comment">//如果新建线程失败,让RejectedExecutionHandler 处理command。</span>
            <span class="token function">reject</span><span class="token punctuation">(</span>command<span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>
    

    4.3 addWorker

    检查是否可以根据当前线程池状态和绑定条件(核心线程或者最大线程)添加一个新的工作线程。
    如果可以的话那么调整运行的线程数量。
    并且 如果线程成功创建和运行的话,那么firstTask将是新线程的第一个任务被执行。
    如果线程池已经关闭或者正在关闭,函数将返回false.
    如果线程创建失败,可能的原因是因为线程工厂返回null,或者异常(最可能的异常就是在执行Thread,start()时产生OutOfMemoryErro )。
    接着回滚以上操作。
    参数:core:如果为true 增加核心线程,false 增加最大线程。

    private boolean addWorker(Runnable firstTask, boolean core) {
           retry://外层循环 循环检查运行状态
           for (;;) {//
               int c = ctl.get();//获取 ctl 值
               int rs = runStateOf(c);//获取当前线程池状态
    
           <span class="token comment">// Check if queue empty only if necessary.</span>
           <span class="token comment">//如果有必要的话 检查任务阻塞队列是否为空</span>
           
           <span class="token comment">//这里第一个条件检查 运行状态 是否为RUNNING 状态。因为只有RUNNING状态不满足 rs&gt;= SHUTDOWN 条件。</span>
           <span class="token comment">//第二个条件 咋一看 不好懂,有点晕。</span>
           <span class="token comment">//仔细分析下,当运行状态为SHUTDOWN 状态时,线程池会执行完阻塞队列内的任务。但是不再接收新任务。</span>
           <span class="token comment">//也就是说 如果 是shutdown 状态,firstTask is null ,阻塞队列里面还有任务时,是允许创建新的新工作线程的。</span>
           <span class="token keyword">if</span> <span class="token punctuation">(</span>rs <span class="token operator">&gt;=</span> SHUTDOWN <span class="token operator">&amp;&amp;</span>
               <span class="token operator">!</span> <span class="token punctuation">(</span>rs <span class="token operator">==</span> SHUTDOWN <span class="token operator">&amp;&amp;</span>
                  firstTask <span class="token operator">==</span> <span class="token keyword">null</span> <span class="token operator">&amp;&amp;</span>
                  <span class="token operator">!</span> workQueue<span class="token punctuation">.</span><span class="token function">isEmpty</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
               <span class="token keyword">return</span> <span class="token boolean">false</span><span class="token punctuation">;</span>
    
        <span class="token comment">//内循环 调整工作线程数</span>
           <span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token punctuation">;</span><span class="token punctuation">;</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
           <span class="token comment">//获取当前工作线程数</span>
               <span class="token keyword">int</span> wc <span class="token operator">=</span> <span class="token function">workerCountOf</span><span class="token punctuation">(</span>c<span class="token punctuation">)</span><span class="token punctuation">;</span>
               <span class="token comment">//如果 大于 最大容量 capacity, 或者 根据绑定条件 大于 核心线程 或者 最大线程数。return false。</span>
               
               
               <span class="token keyword">if</span> <span class="token punctuation">(</span>wc <span class="token operator">&gt;=</span> CAPACITY <span class="token operator">||</span>
                   wc <span class="token operator">&gt;=</span> <span class="token punctuation">(</span>core <span class="token operator">?</span> corePoolSize <span class="token operator">:</span> maximumPoolSize<span class="token punctuation">)</span><span class="token punctuation">)</span>
                   <span class="token keyword">return</span> <span class="token boolean">false</span><span class="token punctuation">;</span>
               <span class="token comment">//cas 调整 工作线程数</span>
               <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token function">compareAndIncrementWorkerCount</span><span class="token punctuation">(</span>c<span class="token punctuation">)</span><span class="token punctuation">)</span>
                   <span class="token keyword">break</span> retry<span class="token punctuation">;</span><span class="token comment">//如果成功 跳出外层循环</span>
               c <span class="token operator">=</span> ctl<span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>  <span class="token comment">// Re-read ctl</span>
               <span class="token comment">//cas 调整 工作线程数失败 </span>
               <span class="token comment">//检查 线程状态 是否改变</span>
               <span class="token comment">//如果没有改变 继续执行 内循环</span>
               <span class="token comment">//如果状态 改变 执行外循环。</span>
               <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token function">runStateOf</span><span class="token punctuation">(</span>c<span class="token punctuation">)</span> <span class="token operator">!=</span> rs<span class="token punctuation">)</span>
                   <span class="token keyword">continue</span> retry<span class="token punctuation">;</span>
               <span class="token comment">// else CAS failed due to workerCount change; retry inner loop</span>
           <span class="token punctuation">}</span>
       <span class="token punctuation">}</span>
    <span class="token comment">//开始添加 worker</span>
       <span class="token keyword">boolean</span> workerStarted <span class="token operator">=</span> <span class="token boolean">false</span><span class="token punctuation">;</span>
       <span class="token keyword">boolean</span> workerAdded <span class="token operator">=</span> <span class="token boolean">false</span><span class="token punctuation">;</span>
       <span class="token class-name">Worker</span> w <span class="token operator">=</span> <span class="token keyword">null</span><span class="token punctuation">;</span>
       <span class="token keyword">try</span> <span class="token punctuation">{</span>
       <span class="token comment">//创建一个 工作线程</span>
       <span class="token comment">//worker 构造器内部 调用 线程工厂 创建一个线程。</span>
       
           w <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Worker</span><span class="token punctuation">(</span>firstTask<span class="token punctuation">)</span><span class="token punctuation">;</span>
           <span class="token keyword">final</span> <span class="token class-name">Thread</span> t <span class="token operator">=</span> w<span class="token punctuation">.</span>thread<span class="token punctuation">;</span>
           <span class="token keyword">if</span> <span class="token punctuation">(</span>t <span class="token operator">!=</span> <span class="token keyword">null</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
               <span class="token keyword">final</span> <span class="token class-name">ReentrantLock</span> mainLock <span class="token operator">=</span> <span class="token keyword">this</span><span class="token punctuation">.</span>mainLock<span class="token punctuation">;</span>
               <span class="token comment">//同步锁 阻塞的获取锁</span>
               mainLock<span class="token punctuation">.</span><span class="token function">lock</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
               <span class="token keyword">try</span> <span class="token punctuation">{</span>
                   <span class="token comment">// Recheck while holding lock.</span>
                   <span class="token comment">// Back out on ThreadFactory failure or if</span>
                   <span class="token comment">// shut down before lock acquired.</span>
                   <span class="token comment">//获取池运行状态</span>
                   <span class="token keyword">int</span> rs <span class="token operator">=</span> <span class="token function">runStateOf</span><span class="token punctuation">(</span>ctl<span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
             <span class="token comment">//rs 即为当前线程池运行状态,如果状态是RUNNING 或者 </span>
             <span class="token comment">//第一个条件 是RUNNING 状态 </span>
             <span class="token comment">//第二个条件 是SHUTDOWN状态时 firstTask 必须是 null ,因为 shutdown 状态 不能提交新的任务。但可以创建新工作线程。</span>
             
                   <span class="token keyword">if</span> <span class="token punctuation">(</span>rs <span class="token operator">&lt;</span> SHUTDOWN <span class="token operator">||</span>
                       <span class="token punctuation">(</span>rs <span class="token operator">==</span> SHUTDOWN <span class="token operator">&amp;&amp;</span> firstTask <span class="token operator">==</span> <span class="token keyword">null</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
                       <span class="token comment">//t.isAlive() 如果线程已经start 则返回true ,所以这里是判断 线程t 是不是还可以执行start()。</span>
                       
                       <span class="token keyword">if</span> <span class="token punctuation">(</span>t<span class="token punctuation">.</span><span class="token function">isAlive</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token comment">// precheck that t is startable</span>
                       <span class="token comment">// </span>
                           <span class="token keyword">throw</span> <span class="token keyword">new</span> <span class="token class-name">IllegalThreadStateException</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
                       <span class="token comment">//工作线程 添加到 集合workers 中</span>
                       workers<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span>w<span class="token punctuation">)</span><span class="token punctuation">;</span>
                       <span class="token comment">//修改 poolSize</span>
                       <span class="token keyword">int</span> s <span class="token operator">=</span> workers<span class="token punctuation">.</span><span class="token function">size</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
                       <span class="token keyword">if</span> <span class="token punctuation">(</span>s <span class="token operator">&gt;</span> largestPoolSize<span class="token punctuation">)</span>
                           largestPoolSize <span class="token operator">=</span> s<span class="token punctuation">;</span>
                       workerAdded <span class="token operator">=</span> <span class="token boolean">true</span><span class="token punctuation">;</span>
                   <span class="token punctuation">}</span>
               <span class="token punctuation">}</span> <span class="token keyword">finally</span> <span class="token punctuation">{</span>
               <span class="token comment">//释放 同步锁</span>
                   mainLock<span class="token punctuation">.</span><span class="token function">unlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
               <span class="token punctuation">}</span>
               
               <span class="token keyword">if</span> <span class="token punctuation">(</span>workerAdded<span class="token punctuation">)</span> <span class="token punctuation">{</span>
               <span class="token comment">//如果添加成功 启动线程</span>
                   t<span class="token punctuation">.</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
                   workerStarted <span class="token operator">=</span> <span class="token boolean">true</span><span class="token punctuation">;</span>
               <span class="token punctuation">}</span>
           <span class="token punctuation">}</span>
       <span class="token punctuation">}</span> <span class="token keyword">finally</span> <span class="token punctuation">{</span>
       <span class="token comment">//如果线程启动失败 回滚操作</span>
           <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token operator">!</span> workerStarted<span class="token punctuation">)</span>
               <span class="token function">addWorkerFailed</span><span class="token punctuation">(</span>w<span class="token punctuation">)</span><span class="token punctuation">;</span>w
               <span class="token comment">//线程安全的 从工作线程集合中移除 如果w存在的话,就CAS 调整 工作线程数。并且尝试停止线程池</span>
       <span class="token punctuation">}</span>
       <span class="token keyword">return</span> workerStarted<span class="token punctuation">;</span>
       <span class="token comment">//返回启动成功 与否</span>
    

    }

    4.3 addWorkerFailed

    执行addWorker添加任务失败之后 调用该方法执行回滚操作。
    回滚操作主要做了以下三件事情:

      1. 从workers 工作线程集合中移除 worker 
      2. worker Count  减 1
      3. 尝试终止线程池,如果满足终止条件的话 就会终止线程池。
    
    /**
         * Rolls back the worker thread creation.
         * - removes worker from workers, if present
         * - decrements worker count
         * - rechecks for termination, in case the existence of this
         *   worker was holding up termination
         */
        private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();//获取主锁
            try {
                if (w != null)
                //从workers 移除指定worker
                    workers.remove(w);
                    //workerCount 减 1
                decrementWorkerCount();
                //尝试终止线程池 终止的条件比较苛刻
                tryTerminate();
            } finally {
                mainLock.unlock();//释放锁
            }
        }
    

    4.4 runWorker 执行任务

    添加worker 成功之后,worker就开始执行runWorker()了。
    这个方法是比较重要的方法,是线程池的核心。是worker 线程 运行循环,重复地从队列中获取任务并执行它们,同时处理以下一些问题:

      1.从worker 的firstTask 开始执行,随着线程的运行,worker不停的执行getTask() 获取任务,如果拿不到任务,就会根据线程的池配置信息 来决定是否让 worker退出。如果worker执行过程中抛出异常,会导致worker退出,并试图创建一个新的线程替代当前worker。
        2.执行任务之前,获取锁确保任务执行不被其他线程干扰。如果不是STOP状态,worker线程需要清除中断标记。
        3.处理每个任务之前都要执行钩子函数 beforeExecute(),如果钩子函数抛出异常那么也会出发worker 过早的退出,并尝试创建一个新的worker 替代它。
        4.假如执行beforeExecute()顺利,接下来就是 执行任务了。如果执行中出现异常,那么就会捕获这些异常,进行包装成 runtimeException 和 error,Throwable也被包装成 Error的因为Ruunabl.run 不能抛出Throwabl 异常。最后把这些异常 和 worker 发给 afterExecute()处理。
        5.任务执行完成后,执行afterExecute(),也可能会产生异常,任何的异常都会导致worker 销毁。
    
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;//获取firstTask
            w.firstTask = null;//清除 firstTask
            w.unlock(); 
            // 允许worker 中断,为什么unlock()就可以允许中断?因为在worker构建实列时。worker的state 属性被设置为-1,在调用池shutDownNow()函数分别对worker进行终止时,会判断worker 的 state >= 0 ,如果不符合 就不会终止。
            boolean completedAbruptly = true;//是否突然完成,就是如果执行任务时,意外退出,该标记就不会被修改为false.
            try {
            1.先执行woker的firstTask 如果firstTask 已经执行,从任务阻塞队列获取任务。
            2.直到获取不到任务跳出while循环。
                while (task != null || (task = getTask()) != null) {
                    w.lock();//获取锁
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    1.如果 小于 STOP 状态 ,就进行 中断标记的清除,
                    2.Thread.interrupted() 会清除当前线程的 中断标记。
                    3.清除中断标记是为了消除 外部代码 对worker 的中断操作。
                    4.如果大于等于 STOP状态 就会中断线程。
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                    //执行任务前 处理
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                        //执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                        //任务执行后处理操作
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;//执行任务数++;
                        w.unlock();//施放锁。
                    }
                }
                completedAbruptly = false;//不是意外退出。
            } finally {
            //处理worker 退出。
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    4.5 getTask() 获取任务

    阻塞或者限时的通过getTask() 函数获取任务。不仅仅是获取任务,同时通过是不是返回null控制者worker 线程的退出与否。以下情况会导致函数返回null,导致worker 退出:

        1.当前池执行的线程数超过了maximumPoolSize设置。
        2.线程池stop状态
        3.线程池时shutdown 状态 并且 任务阻塞队列 是空的。
        4.当前worker 获取任务时等待超时。具体的需要根据是否允许核心线程退出,是否大于核心线程,设置判断
    
         private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
              //timedOut  是否已经超时的标记,如果超时获取任务 ,timedOut 设置为 true,就会影响到后面的逻辑判断。
            for (;;) {//循环的判断线程池状态 
                int c = ctl.get();//获取ctl 值
                int rs = runStateOf(c);//获取线程池运行状态
    
            <span class="token comment">// Check if queue empty only if necessary.</span>
          <span class="token number">1.</span>如果是大于等于 shutdown 状态 <span class="token punctuation">,</span>并且 是大于等于stop状态 那么获取任务失败,线程退出
          <span class="token number">2.</span>如果是shutdown 状态,并且 任务阻塞队列 workQuenu is empty 那么获取任务失败,线程退出。
            <span class="token keyword">if</span> <span class="token punctuation">(</span>rs <span class="token operator">&gt;=</span> SHUTDOWN <span class="token operator">&amp;&amp;</span> <span class="token punctuation">(</span>rs <span class="token operator">&gt;=</span> STOP <span class="token operator">||</span> workQueue<span class="token punctuation">.</span><span class="token function">isEmpty</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
                <span class="token function">decrementWorkerCount</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//调整工作线程数减1 ;</span>
                <span class="token keyword">return</span> <span class="token keyword">null</span><span class="token punctuation">;</span><span class="token comment">//返回 task is null</span>
            <span class="token punctuation">}</span>
    
          <span class="token comment">//获取池正在运行工作线程数</span>
            <span class="token keyword">int</span> wc <span class="token operator">=</span> <span class="token function">workerCountOf</span><span class="token punctuation">(</span>c<span class="token punctuation">)</span><span class="token punctuation">;</span>
    
            <span class="token comment">// Are workers subject to culling?</span>
           <span class="token comment">//是否 执行定时的获取任务标记</span>
          <span class="token comment">// 如果 运行核心线程退出 或者 正在运行线程数大于核心线程 ,执行定时的获取任务。</span>
            <span class="token keyword">boolean</span> timed <span class="token operator">=</span> allowCoreThreadTimeOut <span class="token operator">||</span> wc <span class="token operator">&gt;</span> corePoolSize<span class="token punctuation">;</span>
    
          <span class="token comment">//判断当前worker 是否满足 退出条件,如果满足就执行cas 调整线程数 减 1 。并返回null</span>
          
            <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token punctuation">(</span>wc <span class="token operator">&gt;</span> maximumPoolSize <span class="token operator">||</span> <span class="token punctuation">(</span>timed <span class="token operator">&amp;&amp;</span> timedOut<span class="token punctuation">)</span><span class="token punctuation">)</span>
                <span class="token operator">&amp;&amp;</span> <span class="token punctuation">(</span>wc <span class="token operator">&gt;</span> <span class="token number">1</span> <span class="token operator">||</span> workQueue<span class="token punctuation">.</span><span class="token function">isEmpty</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
                <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token function">compareAndDecrementWorkerCount</span><span class="token punctuation">(</span>c<span class="token punctuation">)</span><span class="token punctuation">)</span>
                    <span class="token keyword">return</span> <span class="token keyword">null</span><span class="token punctuation">;</span>
                <span class="token keyword">continue</span><span class="token punctuation">;</span>
            <span class="token punctuation">}</span>
    
            <span class="token keyword">try</span> <span class="token punctuation">{</span>
                <span class="token comment">//三元表达式</span>
                <span class="token comment">//wordQuenu.poll 是等待keepAliveTime 时间 获取任务,如果超时返回null</span>
                <span class="token comment">//workQuenu.take 阻塞的获取任务,直到获取到一个任务为止。</span>
                <span class="token class-name">Runnable</span> r <span class="token operator">=</span> timed <span class="token operator">?</span>
                    workQueue<span class="token punctuation">.</span><span class="token function">poll</span><span class="token punctuation">(</span>keepAliveTime<span class="token punctuation">,</span> <span class="token class-name">TimeUnit</span><span class="token punctuation">.</span>NANOSECONDS<span class="token punctuation">)</span> <span class="token operator">:</span>
                    workQueue<span class="token punctuation">.</span><span class="token function">take</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
                <span class="token keyword">if</span> <span class="token punctuation">(</span>r <span class="token operator">!=</span> <span class="token keyword">null</span><span class="token punctuation">)</span>
                    <span class="token keyword">return</span> r<span class="token punctuation">;</span>
                timedOut <span class="token operator">=</span> <span class="token boolean">true</span><span class="token punctuation">;</span><span class="token comment">//如果 获取到的 r is null,那么说明 执行了 poll 获取任务超时。</span>
            <span class="token punctuation">}</span> <span class="token keyword">catch</span> <span class="token punctuation">(</span><span class="token class-name">InterruptedException</span> retry<span class="token punctuation">)</span> <span class="token punctuation">{</span>
                timedOut <span class="token operator">=</span> <span class="token boolean">false</span><span class="token punctuation">;</span>
            <span class="token punctuation">}</span>
        <span class="token punctuation">}</span>
    <span class="token punctuation">}</span>
    

    4.6 processWorkerExit 处理worker退出

    1. 对即将死亡worker 进行清理 和 记账(统计worker执行的任务数)
    2. 如果worker突然死亡(执行中有异常) 就需要调整运行的线程数
    3. 将worker 从worker线程集合中移除。(除名)
    4. 可能会终止线程池
    5. 在小于STOP状态前提下,如果由于执行 用户任务异常导致线程退出,创建新线程替代
    6. 在小于STOP状态前提下,不允许核心线程退出,如果运行中的线程小于核心线程,创建新线程替代。
     private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // 如果runWorker中出现异常,那么调整woker 线程数量
                decrementWorkerCount();//调整数量,循环cas 保证调整成功
    
        <span class="token keyword">final</span> <span class="token class-name">ReentrantLock</span> mainLock <span class="token operator">=</span> <span class="token keyword">this</span><span class="token punctuation">.</span>mainLock<span class="token punctuation">;</span><span class="token comment">//</span>
        mainLock<span class="token punctuation">.</span><span class="token function">lock</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//获取锁</span>
        <span class="token keyword">try</span> <span class="token punctuation">{</span>
            completedTaskCount <span class="token operator">+=</span> w<span class="token punctuation">.</span>completedTasks<span class="token punctuation">;</span><span class="token comment">//给即将死亡的工人记录</span>
            workers<span class="token punctuation">.</span><span class="token function">remove</span><span class="token punctuation">(</span>w<span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//花名册除名</span>
        <span class="token punctuation">}</span> <span class="token keyword">finally</span> <span class="token punctuation">{</span>
            mainLock<span class="token punctuation">.</span><span class="token function">unlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//释放锁</span>
        <span class="token punctuation">}</span>
    
        <span class="token function">tryTerminate</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//尝试终止池</span>
    
        <span class="token keyword">int</span> c <span class="token operator">=</span> ctl<span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
     <span class="token comment">//寻找新的worker 替代死亡的worker</span>
        <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token function">runStateLessThan</span><span class="token punctuation">(</span>c<span class="token punctuation">,</span> STOP<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span><span class="token comment">//如果 池STOP 那么就没必要找新的worker啦</span>
            <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token operator">!</span>completedAbruptly<span class="token punctuation">)</span> <span class="token punctuation">{</span><span class="token comment">//如果 正常执行 终止</span>
                <span class="token keyword">int</span> min <span class="token operator">=</span> allowCoreThreadTimeOut <span class="token operator">?</span> <span class="token number">0</span> <span class="token operator">:</span> corePoolSize<span class="token punctuation">;</span><span class="token comment">//判断需要保留的最小worker数量</span>
                <span class="token keyword">if</span> <span class="token punctuation">(</span>min <span class="token operator">==</span> <span class="token number">0</span> <span class="token operator">&amp;&amp;</span> <span class="token operator">!</span> workQueue<span class="token punctuation">.</span><span class="token function">isEmpty</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token comment">//如果是 0 并且 任务 队列 不为 0 </span>
                    min <span class="token operator">=</span> <span class="token number">1</span><span class="token punctuation">;</span><span class="token comment">//至少保留一个 把活干完</span>
                <span class="token keyword">if</span> <span class="token punctuation">(</span><span class="token function">workerCountOf</span><span class="token punctuation">(</span>c<span class="token punctuation">)</span> <span class="token operator">&gt;=</span> min<span class="token punctuation">)</span> <span class="token comment">//如果 当前worker 数量 大于 至少 保留数量 </span>
                    <span class="token keyword">return</span><span class="token punctuation">;</span> <span class="token comment">// 不需要 寻找替代者</span>
            <span class="token punctuation">}</span>
            <span class="token function">addWorker</span><span class="token punctuation">(</span><span class="token keyword">null</span><span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">// 找新的worker 替代 firstTask is null</span>
        <span class="token punctuation">}</span>
    <span class="token punctuation">}</span>
    

    4.7 tryTerminate 尝试终止池

    1.如果池是shutdown 并且 阻塞队列 为空 并且 工作线程数 为空 转换为终止状态

    2.如果 池是 stop 并且 工作线程为空 转换为 终止状态

    3.满足终止条件,但是工作线程不为 0 ,终止一个空闲线程

     final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                    1.如果池处于 允许状态,不终止池。
                    2.如果池状态 >= TIDYING,说明池正在终止,不需要再次终止
                    3.如果 池状态 == shutdown 并且 workQuenu 不为空,需要执行完剩下的任务,不终止。
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                //如果池可以被终止  但是 工作线程 不为 0 ,即使 都池状态 和 队列 都满足终止条件 
                //但是 工作线程数 还存在 那么就要终止一个空闲线程
                //为什么终止一个空闲线程?
                //因为此时任务队列为空, 有worker 在getTask 时 从阻塞队列中 take  任务 ,
                //take 方法会一直阻塞worker 直到有任务,但是现在池不再接受新的任务,所以worker 会被一直阻塞
                //由于take通过LockSupport.park()阻塞线程,而LockSupport.park()能响应中断信号
                //所以通过终止 被阻塞的 worker 使其抛出 InterruptedException
                //抛出异常之后 worker最终会被处理退出 ,就会调用该函数,又会终止下一个被take阻塞的worker
                //这样所有被take 阻塞的线程 会 一个一个的 被终止。
                //这一点线程池的设计者 真是很厉害啊 ,想了好久 才发现这一点。
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
            <span class="token keyword">final</span> <span class="token class-name">ReentrantLock</span> mainLock <span class="token operator">=</span> <span class="token keyword">this</span><span class="token punctuation">.</span>mainLock<span class="token punctuation">;</span>
            mainLock<span class="token punctuation">.</span><span class="token function">lock</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//获取锁</span>
            <span class="token keyword">try</span> <span class="token punctuation">{</span>
                <span class="token keyword">if</span> <span class="token punctuation">(</span>ctl<span class="token punctuation">.</span><span class="token function">compareAndSet</span><span class="token punctuation">(</span>c<span class="token punctuation">,</span> <span class="token function">ctlOf</span><span class="token punctuation">(</span>TIDYING<span class="token punctuation">,</span> <span class="token number">0</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span><span class="token comment">//设置pool 为 TIDYING ,线程数为0 </span>
                    <span class="token keyword">try</span> <span class="token punctuation">{</span>
                        <span class="token function">terminated</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//转换为终止状态</span>
                    <span class="token punctuation">}</span> <span class="token keyword">finally</span> <span class="token punctuation">{</span>
                        ctl<span class="token punctuation">.</span><span class="token function">set</span><span class="token punctuation">(</span><span class="token function">ctlOf</span><span class="token punctuation">(</span>TERMINATED<span class="token punctuation">,</span> <span class="token number">0</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//设置pool 为shutdown ,线程数为0</span>
                        termination<span class="token punctuation">.</span><span class="token function">signalAll</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//唤醒所有等待在终止condition上的线程</span>
                    <span class="token punctuation">}</span>
                    <span class="token keyword">return</span><span class="token punctuation">;</span>
                <span class="token punctuation">}</span>
            <span class="token punctuation">}</span> <span class="token keyword">finally</span> <span class="token punctuation">{</span>
                mainLock<span class="token punctuation">.</span><span class="token function">unlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//施放锁</span>
            <span class="token punctuation">}</span>
            <span class="token comment">// else retry on failed CAS cas 修改 ctl 失败 ,重试。</span>
        <span class="token punctuation">}</span>
    <span class="token punctuation">}</span>
    

    4.8 interruptIdleWorkers

    中断可能等待任务的线程。如果队列任务是空的,有的线程会被一直阻塞,调用该方法会中断那些被一直阻塞的线程:

     private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    

    转载:https://www.jianshu.com/p/04231bd5a244

  • 相关阅读:
    oracle如何实现自增?----用序列sequence的方法来实现
    win7旗舰版安装 oracle 10g 不能进入图形界面的问题
    MBA都需要学习哪些课程
    查看Oracle当前用户下的(表视图,同义词...)
    辽宁省全国计算机等级考试 网上报名须知
    大学毕业之后的几年 你能考哪些证书
    plsql启动报 Using filter for all users can lead to poor perform
    hive web界面管理
    hive常用命令
    hive-site.xml
  • 原文地址:https://www.cnblogs.com/DuJiu/p/12526642.html
Copyright © 2011-2022 走看看