zoukankan      html  css  js  c++  java
  • Java并发包——线程池

    Java并发包——线程池

    摘要:本文主要学习了Java并发包中的线程池。

    部分内容来自以下博客:

    https://www.cnblogs.com/dolphin0520/p/3932921.html

    https://blog.csdn.net/luzhen1012/article/details/79230091

    https://www.cnblogs.com/wxgblogs/p/5469208.html

    https://blog.csdn.net/seasonLai/article/details/82624236

    https://www.jianshu.com/p/7b1546a3b85b

    线程池相关类和接口

    Executor

    Executor是一个顶层接口,在它里面只声明了一个execute方法,返回值为void,参数为Runnable类型。

    该方法用来在接下来的某个时刻执行提交的command任务。由于Executor不同的实现,执行的时候可能在一个新线程中或由一个线程池里的线程执行,还可以是由调用者线程执行。

    1 public interface Executor {
    2     void execute(Runnable command);
    3 }

    ExecutorService

    ExecutorService接口继承了Executor接口,并声明了一些方法。

     1 public interface ExecutorService extends Executor {
     2     // 用来关闭线程池,此时线程池不能够接受新的任务,它会等待所有任务执行完毕。
     3     void shutdown();
     4     
     5     // 用来关闭线程池,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务。
     6     List<Runnable> shutdownNow();
     7 
     8     // 当调用shutdown或shutdownNow方法后返回为true
     9     boolean isShutdown();
    10 
    11     // 若关闭后所有任务都已完成,则返回true。注意除非首先调用shutdown或shutdownNow,否则isTerminated永不为true。
    12     // 当调用shutdown方法后,如果所有提交的任务都已完成,返回为true。
    13     // 当调用shutdownNow方法后,成功停止,返回为true。
    14     boolean isTerminated();
    15 
    16     // 用来向线程池提交任务,并返回任务执行的结果。
    17     <T> Future<T> submit(Callable<T> task);
    18 
    19     // 用来向线程池提交任务,并返回任务执行的结果。
    20     <T> Future<T> submit(Runnable task, T result);
    21 
    22     // 用来向线程池提交任务,并返回任务执行的结果。
    23     Future<?> submit(Runnable task);
    24     
    25     // ...
    26 }

    ThreadPoolExecutor

    java.uitl.concurrent下的ThreadPoolExecutor类是线程池中最核心的一个类。

    先看一下ThreadPoolExecutor类的实现:

    1 public class ThreadPoolExecutor extends AbstractExecutorService

    ThreadPoolExecutor类继承自AbstractExecutorService抽象类,看一下AbstractExecutorService抽象类的实现:

    1 public abstract class AbstractExecutorService implements ExecutorService

    可以看出AbstractExecutorService抽象类实现了ExecutorService接口,而ExecutorService接口继承了Executor接口,所以ThreadPoolExecutor类也就有了他们的方法。

    在ThreadPoolExecutor类中提供了四个构造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);

    可以发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

    下面解释下一下构造器中各个参数的含义:

    corePoolSize:核心池的大小。当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

    maximumPoolSize:线程池最大线程数,它表示在线程池中最多能创建多少个线程。

    keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。

    unit:参数keepAliveTime的时间单位,有7种取值。在TimeUnit类中有7种静态属性:

    1 TimeUnit.DAYS;//
    2 TimeUnit.HOURS;// 小时
    3 TimeUnit.MINUTES;// 分钟
    4 TimeUnit.SECONDS;//
    5 TimeUnit.MILLISECONDS;// 毫秒
    6 TimeUnit.MICROSECONDS;// 微妙
    7 TimeUnit.NANOSECONDS;// 纳秒

    workQueue:一个阻塞队列,用来存储等待执行的任务。一般来说,这里的阻塞队列有以下几种选择:

    1 ArrayBlockingQueue
    2 LinkedBlockingQueue
    3 SynchronousQueue

    threadFactory:线程工厂,主要用来创建线程。

    handler:表示当拒绝处理任务时的策略,有以下四种取值:

    1 ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    2 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    3 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
    4 ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

    线程池的特性

    线程池状态

    在JDK1.8中ThreadPoolExecutor中用一个AtomicInteger来存储线程池工作状态和工作线程数量:

     1 // 状态:RUNNINT。工作线程数量:0。
     2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     3 // 使用32位的高3位表示状态,余下29位表示容量。
     4 private static final int COUNT_BITS = Integer.SIZE - 3;
     5 // 最大容量为:^29-1。
     6 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
     7  
     8 // 运行状态用32位整形的高3位表示
     9 private static final int RUNNING    = -1 << COUNT_BITS;
    10 private static final int SHUTDOWN   =  0 << COUNT_BITS;
    11 private static final int STOP       =  1 << COUNT_BITS;
    12 private static final int TIDYING    =  2 << COUNT_BITS;
    13 private static final int TERMINATED =  3 << COUNT_BITS;
    14  
    15 // 封装和解析ctl值
    16 private static int runStateOf(int c)     { return c & ~CAPACITY; }
    17 private static int workerCountOf(int c)  { return c & CAPACITY; }
    18 private static int ctlOf(int rs, int wc) { return rs | wc; }

    状态的转换

    1 RUNNING -> SHUTDOWN:调用了shutdown()方法。
    2 RUNNING / SHUTDOWN -> STOP:调用了shutdownNow()方法。
    3 SHUTDOWN -> TIDYING:当队列中任务都被取出执行完成,并且所有工作线程都结束了任务,再没有未被执行的任务。
    4 STOP -> TIDYING:线程池中没有正在运行的线程,任务队列中任务都被取消。
    5 TIDYING -> TERMINATED:钩子方法terminated()执行完毕后。

    状态说明

    RUNNING:运行态,可处理新任务并执行队列中的任务。

    SHUTDOW:关闭态,不接受新任务,但处理队列中的任务。

    STOP:停止态,不接受新任务,不处理队列中任务,且打断运行中任务。

    TIDYING:整理态,所有任务已经结束,将执行terminated()方法。

    TERMINATED:结束态,terminated()方法已完成。

    核心线程池大小(corePoolSize)与最大线程池大小(maximumPoolSize)

    ThreadPoolExecutor会自动调节池子里线程的大小。

    通过execute()方法提交新任务时,如果当前的池子里线程的大小小于核心线程corePoolSize时,则会创建新线程来处理新的请求,即使当前的工作者线程是空闲的。如果运行的线程数是大于corePoolSize但小于maximumPoolSize,而且当任务队列已经满了时,则会创建新线程。

    通过设置corePoolSize等于maximumPoolSize,便可以创建固定大小的线程池数量。而设置maximumPoolSize为一个不受限制的数量如Integer.MAX,便可以创建一个适应任意数量大的并发任务的线程池。常规情况下,可以根据构造方法来设置corePoolSize与maximumPoolSize,但也可以通过setCorePoolSize()和setMaximumPoolSize()方法动态的修改其值。

    按需构造

    默认情况下,核心线程是在开始接收新任务时才初始创建,但是可以使用prestartCoreThread()或prestartAllCoreThreads()方法来动态的预开启所有的线程数。

    任务提交给线程池之后的处理策略

    如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务。

    如果当前线程池中的线程数目大于等于corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中。若添加成功,则该任务会等待空闲线程将其取出去执行。若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务。

    如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理。

    如果线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize,如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

    阻塞队列(缓存队列)

    超出一定数量的任务会转移队列中,队列与池里的线程大小的关联表现在:如运行的线程数小于corePoolSize线程数,Executor会优先添加线程来执行task,而不会添加到队列中。如运行的线程已大于corePoolSize,Executor会把新的任务放于队列中,如队列已到最大时,ThreadPoolExecutor会继续创建线程,直到超过maximumPoolSize。最后,线程超过maximumPoolSize时,Executor将拒绝接收新的task。

    而添加任务到队列时,有三种常规的策略:

    1)直接传递。SynchronousQueue队列的默认方式,一个存储元素的阻塞队列而是直接投递到线程中。每一个入队操作必须等到另一个线程调用移除操作,否则入队将一直阻塞。当处理一些可能有内部依赖的任务时,这种策略避免了加锁操作。直接传递一般不能限制maximumPoolSizes以避免拒绝接收新的任务。如果新增任务的速度大于任务处理的速度就会造成增加无限多的线程的可能性。

    2)无界队列。如LinkedBlockingQueue,当核心线程正在工作时,使用不用预先定义大小的无界队列将使新到来的任务处理等到中,所以如果线程数是小于corePoolSize时,将不会创建有入队操作。这种策略将很适合那些相互独立的任务,如Web服务器。如果新增任务的速度大于任务处理的速度就会造成无界队列一直增长的可能性。

    3)有界队列。如ArrayBlockingQueue,当定义了maximumPoolSizes时使用有界队列可以预防资源的耗尽,但是增加了调整和控制队列的难度,队列的大小和线程池的大小是相互影响的,使用很大的队列和较小的线程池会减少CPU消耗、操作系统资源以及线程上下文开销,但却人为的降低了吞吐量。如果任务是频繁阻塞型的(I/O),系统是可以把时间片分给多个线程的。而采用较小的队列和较大的线程池,虽会造成CPU繁忙,但却会遇到调度开销,这也会降低吞吐量。

    饱和策略(拒绝接收任务)

    当Executor调用shutdown()方法后或者达到工作队列的最大容量时,线程池则已经饱和了,此时则不会接收新的task。但无论是何种情况,execute()方法会调用RejectedExecutionHandler的rejectedExecution()方法来执行饱和策略,在线程池内部预定义了几种处理策略:

    1)终止执行(AbortPolicy)。默认策略,Executor会抛出一个RejectedExecutionException运行异常到调用者线程来完成终止。

    2)调用者线程来运行任务(CallerRunsPolicy)。这种策略会由调用execute()方法的线程自身来执行任务,它提供了一个简单的反馈机制并能降低新任务的提交频率。

    3)丢弃策略(DiscardPolicy)。不处理,直接丢弃提交的任务。

    4)丢弃队列里最近的一个任务(DiscardOldestPolicy)。如果Executor还未shutdown的话,则丢弃工作队列的最近的一个任务,然后执行当前任务。

    线程池中的线程初始化

    默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

    在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

    prestartCoreThread():初始化一个核心线程。

    prestartAllCoreThreads():初始化所有核心线程。

    线程的关闭

    ThreadPoolExecutor提供了两个方法,用于线程池的关闭:

    shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。

    shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。

    线程池容量的动态调整

    ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize()。

    setCorePoolSize();设置核心池大小。

    setMaximumPoolSize():设置线程池最大能创建的线程数目大小。

    如何合理配置线程池的大小

    一般需要根据任务的类型来配置线程池大小。

    如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为NCPU+1。

    如果是IO密集型任务,参考值可以设置为2*NCPU。

    当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

    线程池实现原理

    任务的执行

    一些比较重要的成员变量:

     1 public class ThreadPoolExecutor extends AbstractExecutorService {
     2     // 任务缓存队列,用来存放等待执行的任务。
     3     private final BlockingQueue<Runnable> workQueue;
     4     
     5     // 线程池的主要状态锁,控制线程池状态和线程池大小的改变。
     6     private final ReentrantLock mainLock = new ReentrantLock();
     7     
     8     // 用来存放工作集。
     9     private final HashSet<Worker> workers = new HashSet<Worker>();
    10     
    11     // 用于延时的条件队列。
    12     private final Condition termination = mainLock.newCondition();
    13     
    14     // 线程池中曾经出现过的最大线程数。
    15     private int largestPoolSize;
    16     
    17     // 已经执行完毕的任务个数。
    18     private long completedTaskCount;
    19     
    20     // 线程工厂,用来创建线程。
    21     private volatile ThreadFactory threadFactory;
    22     
    23     // 任务拒绝策略。
    24     private volatile RejectedExecutionHandler handler;
    25     
    26     // 线程存活时间。
    27     private volatile long keepAliveTime;
    28     
    29     // 是否允许为核心线程设置存活时间。
    30     private volatile boolean allowCoreThreadTimeOut;
    31     
    32     // 核心池的大小,即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列。
    33     private volatile int corePoolSize;
    34     
    35     // 线程池最大能容忍的线程数。
    36     private volatile int maximumPoolSize;
    37     
    38     // ...
    39 }

    重要方法

    execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

    submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。

    shutdown()和shutdownNow()是用来关闭线程池的。

    在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:

    提交任务

    execute()方法:

     1 // 在接下来的某个时刻执行任务。
     2 public void execute(Runnable command) {
     3     // 如果是空任务则抛出异常。
     4     if (command == null)
     5         throw new NullPointerException();
     6     // 获取ctl的值。
     7     int c = ctl.get();
     8     // 1 如果工作线程数小于核心线程数。
     9     if (workerCountOf(c) < corePoolSize) {
    10         // 尝试添加线程。
    11         if (addWorker(command, true))
    12             return;
    13         // 添加线程失败则重新获取ctl的值。
    14         c = ctl.get();
    15     }
    16     // 2 如果工作线程数大于或者等于核心线程数。
    17     // 2.1 如果线程池正在运行,则尝试将任务放入缓存队列。
    18     if (isRunning(c) && workQueue.offer(command)) {
    19         // 重新根据ctl检查状态,因为可能在这段时间进程死掉了。
    20         int recheck = ctl.get();
    21         // 2.1.1 如果线程池不处于运行状态,则尝试删除刚刚放入缓存队列的任务。
    22         if (! isRunning(recheck) && remove(command))
    23             // 拒绝任务。
    24             reject(command);
    25         // 2.1.2 如果线程池在运行状态
    26         // 如果工作线程数是0。
    27         else if (workerCountOf(recheck) == 0)
    28              // 添加无初始任务的线程。
    29             addWorker(null, false);
    30     }
    31     // 2.2 如果线程池不在运行状态,或者缓存队列已满。
    32     // 尝试添加线程。
    33     else if (!addWorker(command, false))
    34         // 添加失败则拒绝任务。
    35         reject(command);
    36 }

    添加线程

    addWorker()方法:

     1 // 添加线程,第一个参数是要添加的任务,第二个参数是是否核心线程,返回是否插入成功。
     2 private boolean addWorker(Runnable firstTask, boolean core) {
     3     retry:
     4     for (;;) {
     5         // 获取ctl的值。
     6         int c = ctl.get();
     7         // 获取运行状态。
     8         int rs = runStateOf(c);
     9         
    10         // 如果线程还未SHUTDOWN,此时是可以添加新线程的。
    11         // 如果线程是SHUTDOWN状态,而且传进来的任务为空,并且任务队列不为空的时候,此时是可以添加新线程的。
    12         // 将这两个条件取反,如果满足,则返回插入失败。
    13         if (rs >= SHUTDOWN &&
    14             ! (rs == SHUTDOWN &&
    15                firstTask == null &&
    16                ! workQueue.isEmpty()))
    17             return false;
    18 
    19         for (;;) {
    20             // 获取线程数。
    21             int wc = workerCountOf(c);
    22             // 如果线程数大于最大值,或者当core为true时大于核心线程数,当core为false时大于最大线程数,返回失败。
    23             if (wc >= CAPACITY ||
    24                 wc >= (core ? corePoolSize : maximumPoolSize))
    25                 return false;
    26             // 尝试通过CAS添加线程数目,成功则跳出retry循环。
    27             if (compareAndIncrementWorkerCount(c))
    28                 break retry;
    29             // 添加失败则重新获取ctl的值。
    30             c = ctl.get();
    31             // 如果状态改变则重新进行retry循环,如果没有改变则重新尝试添加线程数。
    32             if (runStateOf(c) != rs)
    33                 continue retry;
    34         }
    35     }
    36 
    37     // 线程运行标志位。
    38     boolean workerStarted = false;
    39     // 线程添加标志位。
    40     boolean workerAdded = false;
    41     // 封装了任务的Worker对象。
    42     Worker w = null;
    43     try {
    44         w = new Worker(firstTask);
    45         final Thread t = w.thread;
    46         if (t != null) {
    47             // 使用全局锁添加线程。
    48             final ReentrantLock mainLock = this.mainLock;
    49             mainLock.lock();
    50             try {
    51                 // 获取线程池状态。
    52                 int rs = runStateOf(ctl.get());
    53                 // 如果处于运行状态,或者是SHUTDOWN状态并且添加的任务为空。
    54                 if (rs < SHUTDOWN ||
    55                     (rs == SHUTDOWN && firstTask == null)) {
    56                     // 如果线程已经在运行,则抛出异常。
    57                     if (t.isAlive())
    58                         throw new IllegalThreadStateException();
    59                     // 添加线程。
    60                     workers.add(w);
    61                     // 更新线程数量的历史最大值。
    62                     int s = workers.size();
    63                     if (s > largestPoolSize)
    64                         largestPoolSize = s;
    65                     // 更新线程添加标志位。
    66                     workerAdded = true;
    67                 }
    68             } finally {
    69                 mainLock.unlock();
    70             }
    71             // 如果添加成功,则运行线程,并更新线程运行标志位。
    72             if (workerAdded) {
    73                 t.start();
    74                 workerStarted = true;
    75             }
    76         }
    77     } finally {
    78         // 如果线程运行标志位是false,则线程添加失败进行回滚。
    79         if (! workerStarted)
    80             addWorkerFailed(w);
    81     }
    82     return workerStarted;
    83 }

    addWorkerFailed()方法:

     1 // 线程添加失败进行回滚。
     2 private void addWorkerFailed(Worker w) {
     3     // 使用全局锁回滚线程个数的添加。
     4     final ReentrantLock mainLock = this.mainLock;
     5     mainLock.lock();
     6     try {
     7         // 移除线程。
     8         if (w != null)
     9             workers.remove(w);
    10         // 减少线程数。
    11         decrementWorkerCount();
    12         // 尝试终止线程。
    13         tryTerminate();
    14     } finally {
    15         mainLock.unlock();
    16     }
    17 }

    Worker内部类

     1 private final class Worker
     2     extends AbstractQueuedSynchronizer
     3     implements Runnable
     4 {
     5     // 维护了自己的一个Thread对象。
     6     final Thread thread;
     7     // 维护了自己的一个Runnable对象。
     8     Runnable firstTask;
     9     // 记录完成的任务数。
    10     volatile long completedTasks;
    11 
    12     // 使用传入的Runnable对象和ThreadFactory产生的Thread对象生成Worker对象。
    13     Worker(Runnable firstTask) {
    14         setState(-1);
    15         this.firstTask = firstTask;
    16         this.thread = getThreadFactory().newThread(this);
    17     }
    18 
    19     // 重写了Runnable里的run()方法。
    20     public void run() {
    21         runWorker(this);
    22     }
    23     
    24     // ...
    25 }

    启动线程

    runWorker()方法:

     1 // 启动线程最终调用的还是runWorker()方法。
     2 final void runWorker(Worker w) {
     3     Thread wt = Thread.currentThread();
     4     // 获取初始Runnable对象。
     5     Runnable task = w.firstTask;
     6     // 置空初始Runnable对象。
     7     w.firstTask = null;
     8     // 释放锁,允许被中断。
     9     w.unlock();
    10     // 因为运行异常导致线程突然终止的标志。
    11     boolean completedAbruptly = true;
    12     try {
    13         // 获取任务,如果没有任务可以获取,则此循环终止。
    14         while (task != null || (task = getTask()) != null) {
    15             // 获取工作线程锁。
    16             w.lock();
    17             // 如果线程池关闭,则确保线程被中断。
    18             // 如果线程池没有关闭,则确保线程不会被中断。这就要求进行重新获取ctl,以便在清除中断时处理shutdownNow竞争。
    19             if ((runStateAtLeast(ctl.get(), STOP) ||
    20                  (Thread.interrupted() &&
    21                   runStateAtLeast(ctl.get(), STOP))) &&
    22                 !wt.isInterrupted())
    23                 wt.interrupt();
    24             try {
    25                 // 回调方法,给子类具体实现。
    26                 beforeExecute(wt, task);
    27                 Throwable thrown = null;
    28                 try {
    29                     // 调用Runnable对象的run()方法。
    30                     task.run();
    31                 } catch (RuntimeException x) {
    32                     thrown = x; throw x;
    33                 } catch (Error x) {
    34                     thrown = x; throw x;
    35                 } catch (Throwable x) {
    36                     thrown = x; throw new Error(x);
    37                 } finally {
    38                     // 回调方法,给子类具体实现。
    39                     afterExecute(task, thrown);
    40                 }
    41             } finally {
    42                 // 置空,如果进入下一个循环可以继续取任务。
    43                 task = null;
    44                 // 完成数累加。
    45                 w.completedTasks++;
    46                 // 释放工作线程锁。
    47                 w.unlock();
    48             }
    49         }
    50         // 说明不是用户任务异常引起的。
    51         completedAbruptly = false;
    52     } finally {
    53         // 程序退出。
    54         processWorkerExit(w, completedAbruptly);
    55     }
    56 }

    processWorkerExit()方法:

     1 // 处理线程退出,第一个参数是要处理的Worker,第二个参数用来判断是否异常导致退出。
     2 private void processWorkerExit(Worker w, boolean completedAbruptly) {
     3     // 如果是异常退出,需要减少线程数。如果是正常退出,则不需要调整。
     4     if (completedAbruptly)
     5         decrementWorkerCount();
     6     // 获取全局锁。
     7     final ReentrantLock mainLock = this.mainLock;
     8     mainLock.lock();
     9     try {
    10         // 完成任务数自增。
    11         completedTaskCount += w.completedTasks;
    12         // 移除线程。
    13         workers.remove(w);
    14     } finally {
    15         mainLock.unlock();
    16     }
    17     // 尝试终结线程。
    18     tryTerminate();
    19     // 获取ctl的值。
    20     int c = ctl.get();
    21     // 如果线程池关闭了。
    22     if (runStateLessThan(c, STOP)) {
    23         // 如果线程正常退出。
    24         if (!completedAbruptly) {
    25             // 如果允许超时关闭核心线程,那就是0,否则就取核心线程数。
    26             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    27             // 如果没有核心线程,并且队列中的任务不为空,则设置最少线程为1。
    28             if (min == 0 && ! workQueue.isEmpty())
    29                 min = 1;
    30             // 如果线程数量大于等于正常工作的数量则不再添加新的线程。
    31             if (workerCountOf(c) >= min)
    32                 return;
    33         }
    34         // 添加线程。
    35         addWorker(null, false);
    36     }
    37 }

    获取任务

    getTask()方法:

     1 // 获取任务,控制线程池线程数量。
     2 private Runnable getTask() {
     3     // 获取任务超时标志位。
     4     boolean timedOut = false;
     5     
     6     for (;;) {
     7         // 获取ctl的值。
     8         int c = ctl.get();
     9         // 获取线程池的状态。
    10         int rs = runStateOf(c);
    11         // 如果是STOP状态之后,或者在SHUTDOWN状态之后并且任务队列是空的。
    12         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    13             // 使用CAS让线程数量减一。
    14             decrementWorkerCount();
    15             // 返回并使一个线程退出。
    16             return null;
    17         }
    18         // 获取线程数。
    19         int wc = workerCountOf(c);
    20         // 如果允许为核心线程设置存活时间,或者线程数大于核心线程数。
    21         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    22         // 如果任务数大于最大线程数,或者超时并且允许为核心线程设置存活时间,或者超时并且任务数大于核心线程数。
    23         // 而且,线程数大于1,或者任务队列是空的。
    24         if ((wc > maximumPoolSize || (timed && timedOut))
    25             && (wc > 1 || workQueue.isEmpty())) {
    26             // 使用CAS让线程数量减一。
    27             if (compareAndDecrementWorkerCount(c))
    28                 // 线程数量减一成功,返回并使一个线程退出。
    29                 return null;
    30             // 线程数量减一失败,说明线程数量已被抢先改变,继续循环。
    31             continue;
    32         }
    33         try {
    34             // 超时获取任务。
    35             Runnable r = timed ?
    36                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    37                 workQueue.take();
    38             // 如果在生效时间内获取到任务则返回。
    39             if (r != null)
    40                 return r;
    41             // 否则将超时标志设置为true。
    42             timedOut = true;
    43         } catch (InterruptedException retry) {
    44             // 如果有异常,则将超时标志设置为false。
    45             timedOut = false;
    46         }
    47     }
    48 }

    线程池的使用

    使用ThreadPoolExecutor线程池

    在下面的例子中创建了一个线程池,核心线程数为3,最大线程数为6,任务队列大小为4。随后启动了9个线程,观察其运行情况。

     1 public class Demo {
     2     public static void main(String[] args) {
     3         ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(4));
     4 
     5         for (int i = 1; i <= 9; i++) {
     6             DemoThread dt = new DemoThread(i);
     7             executor.execute(dt);
     8             System.out.println("线程编号:" + i + ",线程池:" + executor.getPoolSize() + ",队列:" + executor.getQueue().size());
     9         }
    10         executor.shutdown();
    11     }
    12 }
    13 
    14 class DemoThread implements Runnable {
    15     int taskNo = 0;
    16 
    17     public DemoThread(int taskNo) {
    18         this.taskNo = taskNo;
    19     }
    20 
    21     @SuppressWarnings("static-access")
    22     public void run() {
    23         try {
    24             Thread.currentThread().sleep(4000);
    25         } catch (InterruptedException e) {
    26             e.printStackTrace();
    27         }
    28     }
    29 }

    运行结果如下:

    1 线程编号:1,线程池:1,队列:0
    2 线程编号:2,线程池:2,队列:0
    3 线程编号:3,线程池:3,队列:0
    4 线程编号:4,线程池:3,队列:1
    5 线程编号:5,线程池:3,队列:2
    6 线程编号:6,线程池:3,队列:3
    7 线程编号:7,线程池:3,队列:4
    8 线程编号:8,线程池:4,队列:4
    9 线程编号:9,线程池:5,队列:4

    结果说明:

    在前3个任务放到线程池里的时候,会在线程池里生成3个核心线程。

    当第4个任务放入到线程池的时候,因为超过了核心线程数,所以放到了任务缓存队列里。

    直到当第8个任务放到线程池里时,因为此时任务缓存队列已经放满了4个任务。此时线程池中有4个线程,但是线程池的最大容量是6,所以尝试创建新的线程。

    如果在线程数达到线程池的最大容量后,还有新的任务请求放入线程池,则会产生RejectedExecutionException拒绝任务异常。

    使用Executors类提供的方法创建线程池

    不提倡直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

     1 // 创建容量为1的缓冲池。
     2 public static ExecutorService newSingleThreadExecutor() {
     3     return new FinalizableDelegatedExecutorService
     4         (new ThreadPoolExecutor(1, 1,
     5                                 0L, TimeUnit.MILLISECONDS,
     6                                 new LinkedBlockingQueue<Runnable>()));
     7 }
     8 // 创建指定容量的缓冲池。
     9 public static ExecutorService newFixedThreadPool(int nThreads) {
    10     return new ThreadPoolExecutor(nThreads, nThreads,
    11                                   0L, TimeUnit.MILLISECONDS,
    12                                   new LinkedBlockingQueue<Runnable>());
    13 }
    14 // 创建容量为Integer.MAX_VALUE的缓冲池。
    15 public static ExecutorService newCachedThreadPool() {
    16     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    17                                   60L, TimeUnit.SECONDS,
    18                                   new SynchronousQueue<Runnable>());
    19 }

    从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

    newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

    newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

    newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

    实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

    另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

  • 相关阅读:
    NumPy学习笔记 三 股票价格
    NumPy学习笔记 二
    NumPy学习笔记 一
    Raspberry Pi中可用的Go IDE:liteide
    数学公式字母发音
    Apache Avro# 1.8.2 Specification (Avro 1.8.2规范)二
    Apache Avro# 1.8.2 Specification (Avro 1.8.2规范)一
    用Go校验下载文件之SHA256
    垂直水平居中的几种方式,其他方式还有很多,不再列举
    vue-cli3 每次打包都改变css img js文件名,还有自带版本号
  • 原文地址:https://www.cnblogs.com/shamao/p/11015519.html
Copyright © 2011-2022 走看看