zoukankan      html  css  js  c++  java
  • Java并发编程--ThreadPoolExecutor

    概述

      为什么要使用线程池?  

      合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。——摘自http://www.infoq.com/cn/articles/java-threadPool。

    类图

       

    使用

      线程池的监控

        可以通过线程池的以下属性监控线程池的当前状态:

          getTaskCount():线程池已经执行的和未执行的任务总数,因为统计的过程中可能会发生变化,该值是个近似值;

          getCompletedTaskCount():已完成的任务数量,是个近似值,该值小于等于TaskCount;

          getLargestPoolSize():线程池曾经的最大线程数量,可以通过该值判断线程池是否满过。如该数值等于线程池的最大大小,则表示线程池曾经满过;

          getPoolSize():线程池当前的线程数量;

          getActiveCount():线程池中活动的线程数(正在执行任务),是个近似值。

        还可以通过重写线程池提供的hook方法(beforeExecute、afterExecute和terminated)进行监控,例如监控任务的平均执行时间、最大执行时间和最小执行时间等。

        程序员可以通过重写钩子 hook 方法(如beforeExecute)实现ThreadPoolExecutor的扩展。

        扩展示例:添加了简单的暂停/恢复功能的子类

     1 class PausableThreadPoolExecutor extends ThreadPoolExecutor {
     2     private boolean isPaused;    //标志是否被暂停
     3     private ReentrantLock pauseLock = new ReentrantLock();    //访问isPaused时需要加锁,保证线程安全
     4     private Condition unpaused = pauseLock.newCondition();
     5 
     6     public PausableThreadPoolExecutor(...) { super(...); }
     7     
     8     //beforeExecute为ThreadPoolExecutor提供的hood方法
     9     protected void beforeExecute(Thread t, Runnable r) {
    10         super.beforeExecute(t, r);
    11         pauseLock.lock();
    12         try {
    13             while (isPaused) 
    14                 unpaused.await();
    15         } catch(InterruptedException ie) {
    16             t.interrupt();
    17         } finally {
    18             pauseLock.unlock();
    19         }
    20     }
    21     //暂停
    22     public void pause() {
    23         pauseLock.lock();
    24         try {
    25             isPaused = true;
    26         } finally {
    27             pauseLock.unlock();
    28         }
    29     }
    30     //取消暂停
    31     public void resume() {
    32         pauseLock.lock();
    33         try {
    34             isPaused = false;
    35             unpaused.signalAll();
    36         } finally {
    37             pauseLock.unlock();
    38         }
    39     }
    40 }

    实现原理

      ThreadPoolExecutor源码分析

        域

     1 //ctl是控制线程池状态的一个变量,包含有效的线程数(workerCount)和线程池的运行状态(runState)两部分信息。高3位表示runState,低29位表示workerCount。
     2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     3 private static final int COUNT_BITS = Integer.SIZE - 3;    //表示workerCount的位数,29位。
     4 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    //线程数的上限,(2^29)-1,大约5亿
     5 
     6 // runState is stored in the high-order bits
     7 private static final int RUNNING    = -1 << COUNT_BITS;    //能接收新任务和处理队列中的任务
     8 private static final int SHUTDOWN   =  0 << COUNT_BITS;    //不能接收新任务,但可以处理队列中的任务
     9 private static final int STOP       =  1 << COUNT_BITS;    //不能接收新任务,不能处理队列中的任务,中断正在执行的任务
    10 private static final int TIDYING    =  2 << COUNT_BITS;    //所有的线程都被终止,workerCount为0时会进入该状态.
    11 private static final int TERMINATED =  3 << COUNT_BITS;    //terminated()方法完成后将进入该状态。

          以上ThreadPoolExecutor的成员变量表示线程池的状态,状态信息存储在ctl变量中,ctl包含有效线程数(workerCount)和线程池运行状态(runState)两部分信息,ctl的高3位表示runState,低29位表示workerCount。ctl初始值为RUNNING状态且线程数为0。

          线程池运行状态的转换如下:

            1)线程池在RUNNING状态下调用shutdown()方法会进入到SHUTDOWN状态,(finalize()方法也会调用shutdownNow())。

            2)在RUNNING和SHUTDOWN状态下调用 shutdownNow() 方法会进入到STOP状态。

            3)在SHUTDOWN状态下,当阻塞队列为空且线程数为0时进入TIDYING状态;在STOP状态下,当线程数为0时进入TIDYING状态。

            4)在TIDYING状态,调用terminated()方法完成后进入TERMINATED状态。

     

     1 //阻塞队列
     2 private final BlockingQueue<Runnable> workQueue;
     3 //可重入锁。访问woker线程和相关记录信息时需要获取该锁
     4 private final ReentrantLock mainLock = new ReentrantLock();
     5 //包含全部worker线程集合,Accessed only under mainLock,HashSet是非线程安全的.
     6 private final HashSet<Worker> workers = new HashSet<Worker>();
     7 private final Condition termination = mainLock.newCondition();
     8 //记录最大的线程数量,Accessed only under mainLock.
     9 private int largestPoolSize;
    10 //完成任务的数量,Accessed only under mainLock.
    11 private long completedTaskCount;
    12 
    13 
    14 //以下所有程序员可以控制的参数都被声明为volatile变量,保证可见性。
    15 
    16 //创建线程的工厂
    17 private volatile ThreadFactory threadFactory;
    18 //线程池饱和或关闭时的处理策略(提供了四种饱和策略)
    19 private volatile RejectedExecutionHandler handler;
    20 //超出corePoolSize数量的空闲线程存活时间(allowCoreThreadTimeOut=true时有效)
    21 private volatile long keepAliveTime;
    22 //allowCoreThreadTimeOut=false,线程不会因为空闲时间超过keepAliveTime而被停止
    23 private volatile boolean allowCoreThreadTimeOut;
    24 //核心线程数
    25 private volatile int corePoolSize;
    26 //最大线程数,此变量的最大上限为CAPACITY
    27 private volatile int maximumPoolSize;

          一、线程池核心线程数和最大线程数

            ThreadPoolExecutor 将根据 corePoolSize (核心线程数)和 maximumPoolSize(最大线程数)设置的边界自动调整线程池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造函数来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

          二、任务队列

            workQueue是一个阻塞队列,用来存储执行的任务。所有的BlockingQueue都可用于workQueue。

              如果有效的线程数小于 corePoolSize,则线程池首选添加新线程,而不进行排队。

              如果有效的线程数大于等于 corePoolSize,则线程池首选将任务加入队列,而不添加新的线程。 

              如果队列已满,则创建新的线程,当线程数超出 maximumPoolSize 时,任务将被拒绝。

            常用的三种阻塞队列的实现:

              1)直接提交。SynchronousQueue是一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。它将任务直接提交给线程而不存储任务。直接提交通常要求不限制 maximumPoolSizes 以避免拒绝新提交的任务。Executors.newCachedThreadPool使用了这个队列。

              2)无界队列。LinkedBlockingQueue是一个基于链表结构的阻塞队列,默认的大小是Integer.MAX_VALUE。创建的线程就不会超过 corePoolSize,会使maximumPoolSize 的值无效。

              3)有界队列。ArrayBlockingQueue是一个基于数组结构的有界阻塞队列。有助于防止资源耗尽,但是可能较难调整和控制。

          三、饱和策略

            当 Executor 已经关闭,或者 Executor 将有限边界用于最大线程和工作队列容量且已经饱和时,在方法 execute(Runnable) 中提交的新任务将被拒绝。线程池提供了4种饱和策略:

              1)AbortPolicy。默认的饱和策略,直接抛出RejectedExecutionException异常。

              2)CallerRunsPolicy。用调用者所在的线程来执行任务,此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

              3)DiscardPolicy。直接丢弃任务。

              4)DiscardOldestPolicy。如果执行程序尚未关闭,则丢弃阻塞队列中最靠前的任务,然后重试执行新任务(如果再次失败,则重复此过程)。

            也可以使用自定义的 RejectedExecutionHandler 类,但需要非常小心,尤其是当策略仅用于特定容量或排队策略时。

          四、threadFactory

            使用 ThreadFactory 创建新线程,默认情况下在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过自定义的 ThreadFactory创建新线程,可以改变线程的名称、线程组、优先级、守护进程状态等。

          五、workers用来存储工作线程,注意HashSet<Worker>是非线程安全的,访问时需要获取mainLock;

          六、mainLock是一个独占式可重入锁,用来保证访问workers和其他监控变量(如largestPoolSize、completedTaskCount等)的线程安全。

          七、keepAliveTime为线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。allowCoreThreadTimeout变量表示是否允许核心线程超时,如果allowCoreThreadTimeOut=false,那么当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize;如果allowCoreThreadTimeOut=true,那么当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=0。

        执行任务(execute)

     1 public void execute(Runnable command) {
     2     if (command == null)
     3         throw new NullPointerException();
     4     /*
     5      * Proceed in 3 steps:
     6      *
     7      * 1. If fewer than corePoolSize threads are running, try to
     8      * start a new thread with the given command as its first
     9      * task.  The call to addWorker atomically checks runState and
    10      * workerCount, and so prevents false alarms that would add
    11      * threads when it shouldn't, by returning false.
    12      *
    13      * 2. If a task can be successfully queued, then we still need
    14      * to double-check whether we should have added a thread
    15      * (because existing ones died since last checking) or that
    16      * the pool shut down since entry into this method. So we
    17      * recheck state and if necessary roll back the enqueuing if
    18      * stopped, or start a new thread if there are none.
    19      *
    20      * 3. If we cannot queue task, then we try to add a new
    21      * thread.  If it fails, we know we are shut down or saturated
    22      * and so reject the task.
    23      */
    24     int c = ctl.get();    //获取线程池的状态(runState和workerCount)
    25     //如果线程数小于corePoolSize,新建一个线程执行该任务。
    26     if (workerCountOf(c) < corePoolSize) {
    27         if (addWorker(command, true))
    28             return;
    29         c = ctl.get();
    30     }
    31     //如果线程池是运行状态,并且添加任务到队列成功(队列未满)
    32     if (isRunning(c) && workQueue.offer(command)) {
    33         int recheck = ctl.get();
    34         //再次判断线程池的运行状态,如果不是运行状态,需要从队列删除该任务。使用拒绝策略处理该任务。
    35         if (! isRunning(recheck) && remove(command))
    36             reject(command);
    37         //如果线程数为0,执行addWorker方法。参数为null的原因是任务已经加入到队列,新建的线程从队列取任务执行即可。
    38         else if (workerCountOf(recheck) == 0)
    39             addWorker(null, false);
    40     }
    41     //线程池不是RUNNING状态或队列已满,尝试新建一个线程执行该任务。如果失败则拒绝该任务。
    42     else if (!addWorker(command, false))
    43         reject(command);
    44 }

         新增线程(addWorker)

          线程被封装在Worker类中。

     1 //参数firstTask表示新建线程执行的第一个任务。如果firstTask为null,表示
     2 //如果参数core=true,把corePoolSize作为线程数上限的判断条件;如果为false,把maximumPoolSize作为线程数上限的判断条件
     3 private boolean addWorker(Runnable firstTask, boolean core) {
     4     retry:
     5     for (;;) {
     6         int c = ctl.get();
     7         int rs = runStateOf(c);
     8         /*
     9          * rs >= SHUTDOWN表示不再接受新任务。 
    10          * 1)线程池的运行状态为SHUTDOWN;2)firstTask == null;3)阻塞队列不为空,只有这三个条件同时满足才不返回false
    11          */
    12         // Check if queue empty only if necessary.
    13         if (rs >= SHUTDOWN &&
    14             ! (rs == SHUTDOWN &&
    15                firstTask == null &&
    16                ! workQueue.isEmpty()))
    17             return false;
    18         
    19         //自旋CAS递增workerCount
    20         for (;;) {
    21             int wc = workerCountOf(c);
    22             //如果线程数超过上限,返回false。如果参数core=true,把corePoolSize作为线程数上限的判断条件;如果为false,把maximumPoolSize作为线程数上限的判断条件
    23             if (wc >= CAPACITY ||
    24                 wc >= (core ? corePoolSize : maximumPoolSize))
    25                 return false;
    26             //CAS递增线程数。如果成功,跳出最外层循环;如果失败,且运行状态没有改变,继续内层循环直到成功。
    27             if (compareAndIncrementWorkerCount(c))
    28                 break retry;
    29             //判断runState是否改变,如果改变则继续外层循环
    30             c = ctl.get();  // Re-read ctl
    31             if (runStateOf(c) != rs)
    32                 continue retry;
    33             // else CAS failed due to workerCount change; retry inner loop
    34         }
    35     }
    36     
    37     //走到这说明需要新建线程,且workerCount更新成功
    38     //下面是新建Worker的过程。
    39     boolean workerStarted = false;    //新建的Worker是否启动标识
    40     boolean workerAdded = false;    //新建的Worker是否被添加到workers标识
    41     Worker w = null;
    42     try {
    43         final ReentrantLock mainLock = this.mainLock;
    44         w = new Worker(firstTask);    //新建Worker
    45         final Thread t = w.thread;
    46         //什么情况下线程会为null呢?在ThreadFactory创建线程失败时可能会出现。
    47         if (t != null) {
    48             mainLock.lock();    //获取mainLock锁。对workers(HashSet非线程安全)和largestPoolSize更新必须加锁
    49             try {
    50                 // Recheck while holding lock.
    51                 // Back out on ThreadFactory failure or if
    52                 // shut down before lock acquired.
    53                 int c = ctl.get();
    54                 int rs = runStateOf(c);
    55                 /*
    56                  *    如果运行状态是RUNNING,或者运行状态是SHUTDOWN且firstTask为null,才将新建的Worker添加到workers
    57                  */
    58                 if (rs < SHUTDOWN ||
    59                     (rs == SHUTDOWN && firstTask == null)) {
    60                     if (t.isAlive()) // precheck that t is startable
    61                         throw new IllegalThreadStateException();
    62                     workers.add(w);
    63                     //更新largestPoolSize,标识线程池曾经出现过的最大线程数
    64                     int s = workers.size();
    65                     if (s > largestPoolSize)
    66                         largestPoolSize = s;
    67                     workerAdded = true;
    68                 }
    69             } finally {
    70                 mainLock.unlock();    //释放mainLock锁
    71             }
    72             if (workerAdded) {
    73                 //启动线程
    74                 t.start();
    75                 workerStarted = true;
    76             }
    77         }
    78     } finally {
    79         //新建的Worker未启动,进行失败处理
    80         if (! workerStarted)
    81             addWorkerFailed(w);
    82     }
    83     return workerStarted;
    84 }

        Worker类

          每个线程被封装为一个Worker类实例。Worker类继承了AbstractQueuedSynchronizer,并实现了一个互斥非重入锁。Worker类同时继承了Runnable,Worker类的实例也是一个线程。

     1 private final class Worker
     2     extends AbstractQueuedSynchronizer
     3     implements Runnable
     4 {
     5     /**
     6      * This class will never be serialized, but we provide a
     7      * serialVersionUID to suppress a javac warning.
     8      */
     9     private static final long serialVersionUID = 6138294804551838833L;
    10 
    11     /** Thread this worker is running in.  Null if factory fails. */
    12     final Thread thread;    //处理任务的线程
    13     /** Initial task to run.  Possibly null. */
    14     Runnable firstTask;        //传入的任务
    15     /** Per-thread task counter */
    16     volatile long completedTasks;    //完成的任务数
    17 
    18     /**
    19      * Creates with given first task and thread from ThreadFactory.
    20      * @param firstTask the first task (null if none)
    21      */
    22     Worker(Runnable firstTask) {
    23         //同步状态初始化为-1,在执行runWorker方法前禁止中断当前线程
    24         setState(-1); // inhibit interrupts until runWorker 
    25         this.firstTask = firstTask;
    26         this.thread = getThreadFactory().newThread(this);    //通过ThreadFactory创建线程
    27     }
    28 
    29     /** Delegates main run loop to outer runWorker  */
    30     public void run() {
    31         runWorker(this);
    32     }
    33 
    34     // Lock methods
    35     //
    36     // The value 0 represents the unlocked state.
    37     // The value 1 represents the locked state.
    38     //实现了一个非重入互斥锁,state=0表示解锁状态,state=1表示加锁状态
    39     protected boolean isHeldExclusively() {
    40         return getState() != 0;
    41     }
    42 
    43     protected boolean tryAcquire(int unused) {
    44         if (compareAndSetState(0, 1)) {
    45             setExclusiveOwnerThread(Thread.currentThread());
    46             return true;
    47         }
    48         return false;
    49     }
    50 
    51     protected boolean tryRelease(int unused) {
    52         setExclusiveOwnerThread(null);
    53         setState(0);
    54         return true;
    55     }
    56 
    57     public void lock()        { acquire(1); }
    58     public boolean tryLock()  { return tryAcquire(1); }
    59     public void unlock()      { release(1); }
    60     public boolean isLocked() { return isHeldExclusively(); }
    61 
    62     void interruptIfStarted() {
    63         Thread t;
    64         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    65             try {
    66                 t.interrupt();
    67             } catch (SecurityException ignore) {
    68             }
    69         }
    70     }
    71 }

        runWorker方法

     1 final void runWorker(Worker w) {
     2     Thread wt = Thread.currentThread();
     3     Runnable task = w.firstTask;
     4     w.firstTask = null;
     5     //Worker初始化时同步状态置为-1,此处进行解锁操作目的是将同步状态置为0,允许中断。
     6     w.unlock(); // allow interrupts
     7     boolean completedAbruptly = true;    //是否因为异常跳出循环
     8     try {
     9         //如果firstTask为null则通过getTask()方法从队列中获取。
    10         //正常情况下,会一直执行While循环,如果队列为空,getTask()方法中会阻塞当前线程,getTask()返回null时会跳出循环
    11         while (task != null || (task = getTask()) != null) {
    12             w.lock();    //加Worker锁
    13             // If pool is stopping, ensure thread is interrupted;
    14             // if not, ensure thread is not interrupted.  This
    15             // requires a recheck in second case to deal with
    16             // shutdownNow race while clearing interrupt
    17             /*
    18              * 如果线程池正在停止,要保证当前线程是中断状态
    19              * 如果不是,则要保证当前线程不是中断状态
    20              *  STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()方法会复位中断的状态。
    21              */
    22             if ((runStateAtLeast(ctl.get(), STOP) ||
    23                  (Thread.interrupted() &&
    24                   runStateAtLeast(ctl.get(), STOP))) &&
    25                 !wt.isInterrupted())
    26                 wt.interrupt();
    27             try {
    28                 beforeExecute(wt, task);    //钩子方法
    29                 Throwable thrown = null;
    30                 try {
    31                     task.run();    //调用任务的run方法,而不是start()方法,因为Worker本身就是一个线程类
    32                 } catch (RuntimeException x) {
    33                     thrown = x; throw x;
    34                 } catch (Error x) {
    35                     thrown = x; throw x;
    36                 } catch (Throwable x) {
    37                     thrown = x; throw new Error(x);
    38                 } finally {
    39                     afterExecute(task, thrown);    //钩子方法
    40                 }
    41             } finally {
    42                 task = null;
    43                 w.completedTasks++;
    44                 w.unlock();        //释放Worker锁
    45             }
    46         }
    47         completedAbruptly = false;
    48     } finally {
    49         //跳出循环,执行processWorkerExit()方法
    50         processWorkerExit(w, completedAbruptly);
    51     }
    52 }

        getTask()方法

     1 //如果返回null,在runWorker方法中会执行processWorkerExit,即关闭该线程。
     2 private Runnable getTask() {
     3     //表示上次从队列获取任务是否超时
     4     boolean timedOut = false; // Did the last poll() time out?
     5 
     6     retry:
     7     for (;;) {
     8         int c = ctl.get();
     9         int rs = runStateOf(c);
    10 
    11         // Check if queue empty only if necessary.
    12         // 如果rs >= STOP,或者 rs=SHUTDOWN且队列为空,此时不再接收新任务,将WorkerCount递减并返回null。
    13         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    14             decrementWorkerCount();    //自旋CAS递减workerCount直到成功
    15             return null;
    16         }
    17         
    18         //timed用于判断是否需要重试控制
    19         boolean timed;      // Are workers subject to culling?
    20 
    21         for (;;) {
    22             //allowCoreThreadTimeOut默认是false,核心线程不进行超时控制,当线程数量大于corePoolSize时需要进行超时控制
    23             int wc = workerCountOf(c);
    24             timed = allowCoreThreadTimeOut || wc > corePoolSize;
    25             
    26             //如果wc <= maximumPoolSize ,且上次从队列获取任务超时或本次需要进行超时控制,则跳出内层循环。
    27             //timedOut=true表示上次从队列获取元素超时,说明队列在上次获取的keepAliveTime时间内是空的。
    28             //timed=true说明线程数量大于corePoolSize。
    29             //所以timedOut=true和timed=true同时满足则说明当前线程已经空闲了keepAliveTime时间,并且线程池的数量大于corePoolSize。这时就需要关闭多余的空闲线程(即compareAndDecrementWorkerCount并返回null)。
    30             if (wc <= maximumPoolSize && ! (timedOut && timed))
    31                 break;
    32             //如果线程数量大于maximumPoolSize,或者上次从队列获取任务超时且本次需要进行超时控制。需要递减WorkerCount,如果递减成功则返回null
    33             if (compareAndDecrementWorkerCount(c))
    34                 return null;
    35             //检查线程池运行状态是否改变。如果改变,那么继续外层循环,如果未改变,那么继续内层循环。
    36             c = ctl.get();  // Re-read ctl
    37             if (runStateOf(c) != rs)
    38                 continue retry;
    39             // else CAS failed due to workerCount change; retry inner loop
    40         }
    41 
    42         try {
    43             Runnable r = timed ?
    44                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    
    45                 //超时方式获取,注意keepAliveTime为超出corePoolSize大小的线程的空闲存活时间
    46                 workQueue.take();    //阻塞方式获取,如果队列为空阻塞当前线程
    47             if (r != null)
    48                 return r;
    49             timedOut = true;    //如果超时,继续循环。
    50         } catch (InterruptedException retry) {
    51             //如果发生中断,则将timedOut置为false,继续循环
    52             timedOut = false;
    53         }
    54     }
    55 }

        processWorkerExit方法

     1 private void processWorkerExit(Worker w, boolean completedAbruptly) {
     2     //如果completedAbruptly=false,说明是由getTask返回null导致的,WorkerCount递减的操作已经执行。
     3     //如果completedAbruptly=true,说明是由执行任务的过程中发生异常导致,需要进行WorkerCount递减的操作。
     4     if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
     5         decrementWorkerCount();
     6 
     7     final ReentrantLock mainLock = this.mainLock;
     8     mainLock.lock();
     9     try {
    10         completedTaskCount += w.completedTasks;
    11         workers.remove(w);    //从workers中删除当前worker,对workers更新需要加mainLock锁。
    12     } finally {
    13         mainLock.unlock();
    14     }
    15 
    16     tryTerminate();
    17     
    18     //如果是异常结束(completedAbruptly=true),需要重新调用addWorker()增加一个线程,保持线程数量。
    19     //如果是由getTask()返回null导致的线程结束,需要进行以下判断:
    20     //    1)如果allowCoreThreadTimeOut=true且队列不为空,那么需要至少保证有一个线程。
    21     //    2)如果allowCoreThreadTimeOut=false,那么需要保证线程数大于等于corePoolSize。
    22     //
    23     int c = ctl.get();
    24     if (runStateLessThan(c, STOP)) {
    25         if (!completedAbruptly) {
    26             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    27             if (min == 0 && ! workQueue.isEmpty())
    28                 min = 1;
    29             if (workerCountOf(c) >= min)
    30                 return; // replacement not needed
    31         }
    32         addWorker(null, false);
    33     }
    34 }

        tryTerminate()方法

     1 //根据线程池状态判断是否结束线程池
     2 final void tryTerminate() {
     3     for (;;) {
     4         int c = ctl.get();
     5         //如果线程池运行状态是RUNNING,或者大于等于TIDYING,或者运行状态为SHUTDOWN且队列为空,则直接return。
     6         if (isRunning(c) ||
     7             runStateAtLeast(c, TIDYING) ||
     8             (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
     9             return;
    10         //如果线程数不为0,则中断一个空闲线程并return。为什么有这一步操作。
    11         if (workerCountOf(c) != 0) { // Eligible to terminate
    12             interruptIdleWorkers(ONLY_ONE);
    13             return;
    14         }
    15 
    16         final ReentrantLock mainLock = this.mainLock;
    17         mainLock.lock();
    18         try {
    19             //尝试将状态设置为TIDYING状态,
    20             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    21                 try {
    22                     //如果CAS成功,执行terminated()方法
    23                     terminated();
    24                 } finally {
    25                     ctl.set(ctlOf(TERMINATED, 0));
    26                     termination.signalAll();
    27                 }
    28                 return;
    29             }
    30         } finally {
    31             mainLock.unlock();
    32         }
    33         // else retry on failed CAS
    34     }
    35 }

        shutdown()方法

          线程池运行状态由RUNNING到SHUTDOWN的转换。

     1 public void shutdown() {
     2     final ReentrantLock mainLock = this.mainLock;
     3     mainLock.lock();
     4     try {
     5         //安全管理,检查方法调用者是否有权限中断Worker线程
     6         checkShutdownAccess();
     7         //运行状态改为SHUTDOWN
     8         advanceRunState(SHUTDOWN);    //自旋CAS
     9         //中断空闲线程
    10         interruptIdleWorkers();
    11         onShutdown(); // hook for ScheduledThreadPoolExecutor
    12     } finally {
    13         mainLock.unlock();
    14     }
    15     //尝试结束线程池
    16     tryTerminate();
    17 }
    18 
    19 private void interruptIdleWorkers() {
    20     interruptIdleWorkers(false);
    21 }
    22 
    23 private void interruptIdleWorkers(boolean onlyOne) {
    24     final ReentrantLock mainLock = this.mainLock;
    25     mainLock.lock();    //对workers的操作需要获取mainLock
    26     try {
    27         //遍历所有的线程,如果没有被中断且获取锁成功则中断线程。获取锁失败时很可能该线程正在执行任务(woker执行任务时需要对woker加锁)。
    28         for (Worker w : workers) {
    29             Thread t = w.thread;
    30             if (!t.isInterrupted() && w.tryLock()) {
    31                 try {
    32                     t.interrupt();
    33                 } catch (SecurityException ignore) {
    34                 } finally {
    35                     w.unlock();
    36                 }
    37             }
    38             if (onlyOne)
    39                 break;
    40         }
    41     } finally {
    42         mainLock.unlock();
    43     }
    44 }

        shutdownNow()方法

     1 public List<Runnable> shutdownNow() {
     2     List<Runnable> tasks;
     3     final ReentrantLock mainLock = this.mainLock;
     4     mainLock.lock();
     5     try {
     6         checkShutdownAccess();
     7         advanceRunState(STOP);
     8         //中断所有线程,即使线程正在执行任务
     9         interruptWorkers();    
    10         //取出队列中的任务
    11         tasks = drainQueue();
    12     } finally {
    13         mainLock.unlock();
    14     }
    15     //尝试结束线程池
    16     tryTerminate();
    17     return tasks;
    18 }
    19 
    20 private void interruptWorkers() {
    21     final ReentrantLock mainLock = this.mainLock;
    22     mainLock.lock();
    23     try {
    24         for (Worker w : workers)
    25             w.interruptIfStarted();
    26     } finally {
    27         mainLock.unlock();
    28     }
    29 }
    30 
    31 private List<Runnable> drainQueue() {
    32     BlockingQueue<Runnable> q = workQueue;
    33     List<Runnable> taskList = new ArrayList<Runnable>();
    34     q.drainTo(taskList);
    35     if (!q.isEmpty()) {
    36         for (Runnable r : q.toArray(new Runnable[0])) {
    37             if (q.remove(r))
    38                 taskList.add(r);
    39         }
    40     }
    41     return taskList;
    42 }

       

      FutureTask源码分析

        利用FutureTask可以实现获取异步任务的返回值、取消异步任务等功能。看一下ThreadPoolExecutor的submit方法。submit方法根据任务构造一个FutureTask对象并返回,在主线程中可以根据FutureTask提供的方法进行任务取消和获取异步任务的返回值。

    1 public <T> Future<T> submit(Callable<T> task) {
    2     if (task == null) throw new NullPointerException();
    3     RunnableFuture<T> ftask = newTaskFor(task);
    4     execute(ftask);    //实际执行的任务是ftask
    5     return ftask;
    6 }

        域

    private volatile int state;        //状态,新创建时状态为NEW
    private static final int NEW          = 0;    //新创建    
    private static final int COMPLETING   = 1;    //正在执行
    private static final int NORMAL       = 2;    //正常完成
    private static final int EXCEPTIONAL  = 3;    //执行过程中出现异常
    private static final int CANCELLED    = 4;    //被取消
    private static final int INTERRUPTING = 5;    //
    private static final int INTERRUPTED  = 6;
    
    /** The underlying callable; nulled out after running */
    private Callable<V> callable;    //要执行的任务
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;    //执行callable的线程
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;    //Treiber算法实现的栈,用于存储等待的线程
    
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

          状态的转换有以下几种情况:

            1)NEW -> COMPLETING -> NORMAL 正常执行并返回;

            2)NEW -> COMPLETING -> EXCEPTIONAL 执行过程中出现异常;

            3)NEW -> CANCELLED 执行前被取消

            4)NEW -> INTERRUPTING -> INTERRUPTED 取消时被中断。

        初始化

     1 public FutureTask(Callable<V> callable) {
     2     if (callable == null)
     3         throw new NullPointerException();
     4     this.callable = callable;
     5     this.state = NEW;       // ensure visibility of callable
     6 }
     7 
     8 public FutureTask(Runnable runnable, V result) {
     9     //由于Runnable没有返回值,通过Executors将Runnable转换为Callable。
    10     this.callable = Executors.callable(runnable, result);
    11     this.state = NEW;       // ensure visibility of callable
    12 }

        执行任务-run()方法

     1 public void run() {
     2     //只执行state=NEW的任务。如果state!=NEW说明任务已经执行。
     3     //如果state=NEW,则通过CAS将runner置为当前线程。如果失败说明其他线程已经执行。
     4     if (state != NEW ||
     5         !UNSAFE.compareAndSwapObject(this, runnerOffset,
     6                                      null, Thread.currentThread()))
     7         return;
     8     try {
     9         Callable<V> c = callable;
    10         if (c != null && state == NEW) {
    11             V result;    //任务执行结果
    12             boolean ran;    //任务执行期间是否发生异常
    13             try {
    14                 result = c.call();    //执行任务
    15                 ran = true;
    16             } catch (Throwable ex) {
    17                 result = null;
    18                 ran = false;
    19                 //如果发生异常,执行setException(ex)
    20                 setException(ex);
    21             }
    22             //如果正常结束,执行set(result).
    23             if (ran)
    24                 set(result);
    25         }
    26     } finally {
    27         // runner must be non-null until state is settled to
    28         // prevent concurrent calls to run()
    29         //不管任务执行是否正常,都需要将runner置为null
    30         runner = null;
    31         // state must be re-read after nulling runner to prevent
    32         // leaked interrupts
    33         //防止中断泄露,需要结合cancel方法研究
    34         //如果s>=INTERRUPTING,说明状态变换为NEW -> INTERRUPTING -> INTERRUPTED,即在取消时被中断。
    35         int s = state;
    36         if (s >= INTERRUPTING)
    37             handlePossibleCancellationInterrupt(s);
    38     }
    39 }

          任务执行正常结束:

    1 //任务正常结束,通过CAS更新state为COMPLETING,如果成功,将state更新为NORMAL,唤醒等待线程。
    2 protected void set(V v) {
    3     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    4         outcome = v;    //将运行结果result赋给outcome
    5         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    6         //删除和唤醒所有的等待线程
    7         finishCompletion();
    8     }
    9 }

          任务执行时发生异常:

    1 //任务执行时发生异常,通过CAS更新state为COMPLETING,如果成功,将state更新为EXCEPTIONAL,唤醒等待线程
    2 protected void setException(Throwable t) {
    3     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    4         outcome = t;    //将异常信息赋给outcome
    5         UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    6         finishCompletion();
    7     }
    8 }

          唤醒等待获取任务运行结果的线程:

     1 private void finishCompletion() {
     2     // assert state > COMPLETING;
     3     //自旋CAS更新waiters为null直到成功
     4     for (WaitNode q; (q = waiters) != null;) {
     5         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
     6             for (;;) {
     7                 Thread t = q.thread;
     8                 if (t != null) {
     9                     q.thread = null;
    10                     LockSupport.unpark(t);    //唤醒等待线程,WaitNode是在get方法中添加的
    11                 }
    12                 WaitNode next = q.next;
    13                 if (next == null)
    14                     break;
    15                 q.next = null; // unlink to help gc
    16                 q = next;
    17             }
    18             break;
    19         }
    20     }
    21 
    22     done();    //hook方法,默认不执行任何操作,子类可以重写该方法完成指定的功能(例如:回调)
    23 
    24     callable = null;        // to reduce footprint
    25 }

          handlePossibleCancellationInterrupt方法要确保cancel(true)产生的中断发生在run或runAndReset方法执行的过程中。这里会循环的调用Thread.yield()来确保状态在cancel方法中被设置为INTERRUPTED。

     1 private void handlePossibleCancellationInterrupt(int s) {
     2     // It is possible for our interrupter to stall before getting a
     3     // chance to interrupt us.  Let's spin-wait patiently.
     4     if (s == INTERRUPTING)
     5         while (state == INTERRUPTING)
     6             Thread.yield(); // wait out pending interrupt
     7 
     8     // assert state == INTERRUPTED;
     9 
    10     // We want to clear any interrupt we may have received from
    11     // cancel(true).  However, it is permissible to use interrupts
    12     // as an independent mechanism for a task to communicate with
    13     // its caller, and there is no way to clear only the
    14     // cancellation interrupt.
    15     //
    16     // Thread.interrupted();
    17 }

        获取运行结果-get()方法

     1 public V get() throws InterruptedException, ExecutionException {
     2     int s = state;
     3     //如果state为NEW或COMPLETING,调用awaitDone方法将当前线程添加到waiters中并阻塞
     4     if (s <= COMPLETING)
     5         s = awaitDone(false, 0L);
     6     //如果已经完成(包括正常结束或异常结束),返回
     7     return report(s);
     8 }
     9 
    10 //如果超时则抛出TimeoutException异常
    11 public V get(long timeout, TimeUnit unit)
    12     throws InterruptedException, ExecutionException, TimeoutException {
    13     if (unit == null)
    14         throw new NullPointerException();
    15     int s = state;
    16     if (s <= COMPLETING &&
    17         (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
    18         throw new TimeoutException();
    19     return report(s);
    20 }

          awaitDone方法,阻塞线程。

     1 //timed参数表示是否使用超时机制
     2 private int awaitDone(boolean timed, long nanos)
     3     throws InterruptedException {
     4     final long deadline = timed ? System.nanoTime() + nanos : 0L;
     5     WaitNode q = null;
     6     boolean queued = false;    //是否已经入栈
     7     for (;;) {
     8         //若当前线程被中断,则删除q并抛出InterruptedException()
     9         if (Thread.interrupted()) {
    10             removeWaiter(q);
    11             throw new InterruptedException();
    12         }
    13 
    14         int s = state;
    15         //如果state大于COMPLETING,表明任务已经完成,则将节点q的线程置为null并返回状态值。
    16         if (s > COMPLETING) {
    17             if (q != null)
    18                 q.thread = null;
    19             return s;
    20         }
    21         //s==COMPLETING,说明任务已经执行完成但还没有设置最终状态。
    22         //Thread.yield();让当前正在运行的线程回到可运行状态,以允许其他线程(包括当前线程)获得运行的机会。注意目的是尝试让状态改变,继续下个循环。
    23         else if (s == COMPLETING) // cannot time out yet
    24             Thread.yield();
    25         else if (q == null)
    26             q = new WaitNode();    //新建WaitNode节点
    27         //CAS添加到waiters栈,在阻塞之前先将节点q添加栈,入栈成功后queued更新为true。
    28         else if (!queued)
    29             queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
    30                                                  q.next = waiters, q);
    31         else if (timed) {
    32             nanos = deadline - System.nanoTime();
    33             //如果已经过期,则删除节点q并返回
    34             if (nanos <= 0L) {
    35                 removeWaiter(q);
    36                 return state;
    37             }
    38             LockSupport.parkNanos(this, nanos);    //超时机制阻塞当前线程
    39         }
    40         else
    41             LockSupport.park(this);    //阻塞当前线程
    42     }
    43 }
    44             
    45 //删除指定节点(Treiber算法实现的栈)
    46 private void removeWaiter(WaitNode node) {
    47     if (node != null) {
    48         node.thread = null;    //将线程置为null,因为下面要根据thread是否为null判断是否要把node移出
    49         retry:
    50         for (;;) {          // restart on removeWaiter race
    51             for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
    52                 s = q.next;
    53                 if (q.thread != null)
    54                     pred = q;
    55                 else if (pred != null) {
    56                     pred.next = s;
    57                     if (pred.thread == null) // check for race
    58                         continue retry;
    59                 }
    60                 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
    61                     continue retry;
    62             }
    63             break;
    64         }
    65     }
    66 }

          report方法,返回运行结果或抛出异常。

     1 //任务完成返回执行结果或抛出异常
     2 private V report(int s) throws ExecutionException {
     3     Object x = outcome;
     4     //如果任务正常完成,返回执行结果
     5     if (s == NORMAL)
     6         return (V)x;
     7     //如果s >= CANCELLED,说明任务被取消,那么就抛出CancellationException
     8     if (s >= CANCELLED)
     9         throw new CancellationException();
    10     //最后s==EXCEPTIONAL,任务执行时发生异常,抛出该异常
    11     throw new ExecutionException((Throwable)x);
    12 }

        取消任务-cancel方法

          试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。

     1 public boolean cancel(boolean mayInterruptIfRunning) {
     2     //若state != NEW,说明任务已经启动,则直接返回失败。
     3     if (state != NEW)
     4         return false;
     5     //如果mayInterruptIfRunning为true,要中断当前执行任务的线程。
     6     if (mayInterruptIfRunning) {
     7         //CAS更新state为INTERRUPTING不成功,说明state已被改变(即state != NEW),则直接返回失败。如果成功则中断正在执行任务的线程,并唤醒等待获取结果的线程。
     8         if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
     9             return false;
    10         Thread t = runner;
    11         if (t != null)
    12             t.interrupt();    //中断当前线程
    13         //更新state为INTERRUPTED
    14         UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    15     }
    16     //mayInterruptIfRunning=flase,CAS更新state为CANCELLED,若成功则唤醒等待的线程(不中断正在执行任务的线程),若失败返回false。
    17     else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
    18         return false;
    19     finishCompletion();
    20     return true;
    21 }

      Executors源码解析

        Executors是一个工具类,提供了公共的静态方法,例如创建默认线程工厂、创建线程池、把Runnable包装成Callable的方法等。

        创建默认线程工厂

          DefaultThreadFactory类

     1 static class DefaultThreadFactory implements ThreadFactory {
     2     private static final AtomicInteger poolNumber = new AtomicInteger(1);    //线程池序号
     3     private final ThreadGroup group;    //线程组
     4     private final AtomicInteger threadNumber = new AtomicInteger(1);    //线程号
     5     private final String namePrefix;
     6 
     7     DefaultThreadFactory() {
     8         SecurityManager s = System.getSecurityManager();
     9         group = (s != null) ? s.getThreadGroup() :
    10                               Thread.currentThread().getThreadGroup();
    11         namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    12     }
    13 
    14     public Thread newThread(Runnable r) {
    15         Thread t = new Thread(group, r,
    16                               namePrefix + threadNumber.getAndIncrement(),    //线程名
    17                               0);
    18         //非守护线程
    19         if (t.isDaemon())
    20             t.setDaemon(false);
    21         //相同的优先级
    22         if (t.getPriority() != Thread.NORM_PRIORITY)
    23             t.setPriority(Thread.NORM_PRIORITY);
    24         return t;
    25     }
    26 }

          创建默认工厂方法:

    1 public static ThreadFactory defaultThreadFactory() {
    2     return new DefaultThreadFactory();
    3 }

        创建线程池

          1) newFixedThreadPool方法

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

            固定线程数的线程池,corePoolSize和 maximumPoolSize 都被设置为nThreads,keepAliveTime=0,由于corePoolSize等于maximumPoolSize,所以keepAliveTime和maximumPoolSize参数是无效的。阻塞队列是LinkedBlockingQueue,是一个无界队列。正常情况下(未执行方法shutdown()或shutdownNow()),不会调用饱和策略。

          2)newSingleThreadExecutor方法

    1 public static ExecutorService newSingleThreadExecutor() {
    2     return new FinalizableDelegatedExecutorService
    3         (new ThreadPoolExecutor(1, 1,
    4                                 0L, TimeUnit.MILLISECONDS,
    5                                 new LinkedBlockingQueue<Runnable>()));
    6 }

            单个线程的线程池,corePoolSize和maximumPoolSize都为1,其他同FixedThreadPool。能保证任务按顺序执行。

          3)newCachedThreadPool方法

    1 public static ExecutorService newCachedThreadPool() {
    2     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3                                   60L, TimeUnit.SECONDS,
    4                                   new SynchronousQueue<Runnable>());
    5 }

            线程数可改变的线程池,corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,核心线程数为0,最大线程数为CAPACITY(因为CAPACITY<Integer.MAX_VALUE).keepAliveTime=60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列.这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

        把Runnable包装成Callable的方法

     1 public static <T> Callable<T> callable(Runnable task, T result) {
     2     if (task == null)
     3         throw new NullPointerException();
     4     return new RunnableAdapter<T>(task, result);
     5 }
     6 
     7 public static Callable<Object> callable(Runnable task) {
     8     if (task == null)
     9         throw new NullPointerException();
    10     return new RunnableAdapter<Object>(task, null);
    11 }

    参考资料

      深入理解Java线程池:ThreadPoolExecutor

      聊聊并发(三)——JAVA线程池的分析和使用

      FutureTask源码解析

      FutureTask中的waiters为什么这么设计?

      Treiber Stack

  • 相关阅读:
    .NET中操作SQLite
    Visual Studio 快捷键
    ADO.NET入门教程(三) 连接字符串,你小觑了吗?
    ADO.NET入门教程(二)了解.NET数据提供程序
    Xaml语法概述及属性介绍
    Csharp日常笔记
    C#基础
    PAT-L3-球队“食物链”-dfs-状压-set
    TOJ1302: 简单计算器 && TOJ 4873: 表达式求值&&TOJ3231: 表达式求值
    TOJ 3973 Maze Again && TOJ 3128 简单版贪吃蛇
  • 原文地址:https://www.cnblogs.com/zaizhoumo/p/7794818.html
Copyright © 2011-2022 走看看