zoukankan      html  css  js  c++  java
  • 深入理解Java线程池

      零.引子

      线程是程序运行中一个非常重要的概念。通常情况下,程序从静态代码,到解析为机器码被加载入内存开始动态运行,就转变为一个进程。也可以说,程序是一个静态概念,程序运行起来后就变成了一个进程,进程是计算机分配CPU、内存等各种资源的基本单位。

      我们平时在电脑中开启一些程序时,比如开启eclipse,idea等开发工具时,会发现程序启动较慢,这是因为进程运行所依赖的资源较多,故开启一个进程耗费的资源和时间也较多。一个进程中往往有多个子任务,从而轮番获取CPU的执行权,这些需要获取CPU执行权的子任务就称为线程,线程是计算机分配CPU执行权的最小单位。

      相对于进程而言,线程的运行仅需要CPU执行权即可,耗费资源与时间大大减少。然而,现在的计算机操作系统基本上都承担着处理高并发任务的职责,如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,那么频繁创建线程就会大大降低系统的效率,这是因为,频繁创建线程和销毁线程也需要时间。

      这就不由让我们考虑一个问题:有没有一种办法使得线程可以复用,从而避免线程的频繁创建?也就是说,当线程执行完一个任务时,并不被销毁,而是可以继续执行其他的任务?

      答案是可以的。在Java中,我们可以通过线程池来达到线程复用的效果。我们接下来详细讲解Java的线程池,为此,我们先从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述其实现原理,接着给出一个使用示例,最后讨论如何合理配置线程池的大小。

      一.

      为了对线程池有个比较全面的理解,我们先来看类ThreadPoolExecutor的继承体系:

      通过上图可知,接口Executor是线程池体系的顶级接口。上述线程池继承体系的主要API如下:

     1 public interface Executor {
     2     void execute(Runnable command);
     3 }
     4 public interface ExecutorService extends Executor {
     5     void shutdown();
     6     boolean isShutdown();
     7     boolean isTerminated();
     8     boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
     9     <T> Future<T> submit(Callable<T> task);
    10     <T> Future<T> submit(Runnable task, T result);
    11     Future<?> submit(Runnable task);
    12     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    13     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, throws InterruptedException;
    14     <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    15     <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
    16         throws InterruptedException, ExecutionException, TimeoutException;
    17 }
    18 public abstract class AbstractExecutorService implements ExecutorService {
    19     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    20     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    21     public Future<?> submit(Runnable task) {};
    22     public <T> Future<T> submit(Runnable task, T result) { };
    23     public <T> Future<T> submit(Callable<T> task) { };
    24     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)
    25         throws InterruptedException, ExecutionException, TimeoutException {   };
    26     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    27         throws InterruptedException, ExecutionException {    };
    28     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
    29         throws InterruptedException, ExecutionException, TimeoutException { };
    30     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    31         throws InterruptedException { };
    32     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
    33         throws InterruptedException {
    34     };
    35 }
    36 public class ThreadPoolExecutor extends AbstractExecutorService {
    37     .....
    38     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
    39             BlockingQueue<Runnable> workQueue);
    40     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
    41             BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
    42     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
    43             BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
    44     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
    45         BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    46     ...
    47 }

      在类ThreadPoolExecutor类中,提供了4个构造器,通过对其源代码观察,我们发现前三个构造器都是调用了第四个构造器进行了初始化工作。在此,我们看下第四个构造器中各个参数的含义。

      corePoolSize:核心池的大小。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。如果调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字也可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。

      maximumPoolSize:线程池最大线程数,它表示在线程池中最多能创建多少个线程。

      keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用。也就是说,当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

      unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    1 TimeUnit.DAYS;               //
    2 TimeUnit.HOURS;             //小时
    3 TimeUnit.MINUTES;           //分钟
    4 TimeUnit.SECONDS;           //
    5 TimeUnit.MILLISECONDS;      //毫秒
    6 TimeUnit.MICROSECONDS;      //微妙
    7 TimeUnit.NANOSECONDS;       //纳秒

      workQueue:一个阻塞队列,用来存储等待执行的任务,会对线程池的运行过程产生重大影响。一般来说,这里的阻塞队列有以下几种选择:

    1 ArrayBlockingQueue;
    2 PriorityBlockingQueue;
    3 //上面两种使用较少,我们通常使用下面两种
    4 LinkedBlockingQueue;
    5 SynchronousQueue;
    6 //线程池的排队策略与BlockingQueue有关

      threadFactory:线程工厂,主要用来创建线程。

      handler:表示当拒绝处理任务时的策略,有以下四种取值:

    1 ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    2 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    3 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    4 ThreadPoolExecutor.CallerRunsPolicy:调用其他线程处理该任务

      在介绍了类ThreadPoolExecutor的构造器里的参数的含义后,我们接下来看该类中几个非常重要的方法:

    1 execute()
    2 submit()
    3 shutdown()
    4 shutdownNow()

      execute()方法实际上是接口Executor中声明的方法,在类ThreadPoolExecutor中进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

      submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务的执行结果。

      shutdown()和shutdownNow()是用来关闭线程池的。

      另外,类ThreadPoolExecutor中还定义了一些其他的方法,如getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关的属性。

      二.

      我们在上面从宏观层面介绍了类ThreadPoolExecutor,我们接下来从几个方面深入了解线程池的内部实现原理。

      1.线程池状态

      2.任务的执行

      3.线程池中的线程初始化

      4.任务缓存队列及排队策略

      5.任务拒绝策略

      6.线程池的关闭

      7.线程池容量的动态调整

      在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:  

    1 volatile int runState;
    2 static final int RUNNING    = 0;
    3 static final int SHUTDOWN   = 1;
    4 static final int STOP       = 2;
    5 static final int TERMINATED = 3;

      runState表示当前线程池的状态,它用一个volatile变量来保证线程之间的可见性。runState下面的几个static final变量则表示runState可能的几个取值:

      当创建线程池后,初始时,线程池处于RUNNING状态;

      如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

      如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

      当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

      2.任务的执行

      我们先来看一下类ThreadPoolExecutor中定义的一些比较重要的成员变量:

     1 private final BlockingQueue<Runnable> workQueue;    //任务缓存队列,用来存放等待执行的任务
     2 private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小,runState等)的改变都要使用这个锁
     3 private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集 
     4 private volatile long  keepAliveTime;    //线程存活时间   
     5 private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
     6 private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
     7 private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数 
     8 private volatile int   poolSize;       //线程池中当前的线程数 
     9 private volatile RejectedExecutionHandler handler; //任务拒绝策略
    10 private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程 
    11 private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数 
    12 private long completedTaskCount;   //用来记录已经执行完毕的任务个数

      总的来说,任务提交给线程池之后的处理策略,主要有4个方面:  

    • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
    • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
    • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
    • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

      3.线程池中的线程初始化  

      默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

      在实际中,如果需要线程池创建之后立即创建线程,可以通过以下两个方法实现:

    • prestartCoreThread():初始化一个核心线程;
    • prestartAllCoreThreads():初始化所有核心线程

      4.任务缓存队列及排队策略

      我们在前面提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。  

      workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

      1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

      2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

      3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

      5.任务拒绝策略

      当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:  

    1 ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    2 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    3 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    4 ThreadPoolExecutor.CallerRunsPolicy:调用其他线程处理该任务

      6.线程池的关闭  

      ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

    • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
    • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务  

      7.线程池容量的动态调整

      ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

    • setCorePoolSize:设置核心池大小
    • setMaximumPoolSize:设置线程池最大能创建的线程数目大小

      当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

      三.

      线程池的使用示例。

      

     1 public class Test6 {
     2     public static void main(String[] args) {
     3         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
     4         for(int i=0;i<15;i++){
     5             MyTask myTask=new MyTask(i);
     6             executor.execute(myTask);
     7             System.out.println("线程池中线程数目:"+executor.getPoolSize()+
     8                     ",队列中等待的任务数目:"+executor.getQueue().size()+
     9                     ",已执行完其他的任务数目:"+executor.getCompletedTaskCount());
    10         }
    11         executor.shutdown();
    12     }
    13     static class MyTask implements Runnable{
    14         private int taskNum;
    15         public MyTask(int taskNum){
    16             this.taskNum=taskNum;
    17         }
    18         @Override
    19         public void run() {
    20             System.out.println("正在执行task:"+taskNum);
    21             try {
    22                 Thread.currentThread().sleep(1000);
    23             } catch (InterruptedException e) {
    24                 e.printStackTrace();
    25             }
    26             System.out.println("task--"+taskNum+"执行完毕");
    27         }
    28     }
    29 }

      上述代码执行结果如下:

    正在执行task:0
    线程池中线程数目:1,队列中等待的任务数目:0,已执行完其他的任务数目:0
    线程池中线程数目:2,队列中等待的任务数目:0,已执行完其他的任务数目:0
    正在执行task:1
    线程池中线程数目:3,队列中等待的任务数目:0,已执行完其他的任务数目:0
    正在执行task:2
    线程池中线程数目:4,队列中等待的任务数目:0,已执行完其他的任务数目:0
    正在执行task:3
    线程池中线程数目:5,队列中等待的任务数目:0,已执行完其他的任务数目:0
    线程池中线程数目:5,队列中等待的任务数目:1,已执行完其他的任务数目:0
    正在执行task:4
    线程池中线程数目:5,队列中等待的任务数目:2,已执行完其他的任务数目:0
    线程池中线程数目:5,队列中等待的任务数目:3,已执行完其他的任务数目:0
    线程池中线程数目:5,队列中等待的任务数目:4,已执行完其他的任务数目:0
    线程池中线程数目:5,队列中等待的任务数目:5,已执行完其他的任务数目:0
    线程池中线程数目:6,队列中等待的任务数目:5,已执行完其他的任务数目:0
    正在执行task:10
    线程池中线程数目:7,队列中等待的任务数目:5,已执行完其他的任务数目:0
    正在执行task:11
    线程池中线程数目:8,队列中等待的任务数目:5,已执行完其他的任务数目:0
    正在执行task:12
    线程池中线程数目:9,队列中等待的任务数目:5,已执行完其他的任务数目:0
    正在执行task:13
    线程池中线程数目:10,队列中等待的任务数目:5,已执行完其他的任务数目:0
    正在执行task:14
    task--0执行完毕
    正在执行task:5
    task--12执行完毕
    task--11执行完毕
    正在执行task:6
    task--3执行完毕
    正在执行task:8
    task--4执行完毕
    正在执行task:9
    task--13执行完毕
    task--2执行完毕
    正在执行task:7
    task--14执行完毕
    task--10执行完毕
    task--1执行完毕
    task--5执行完毕
    task--9执行完毕
    task--7执行完毕
    task--8执行完毕
    task--6执行完毕

      从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。

      在Java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池,具体如下:  

    1 Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
    2 Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
    3 Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

      上述三个静态方法的实现如下:  

     1 public static ExecutorService newFixedThreadPool(int nThreads) {
     2     return new ThreadPoolExecutor(nThreads, nThreads,
     3                                   0L, TimeUnit.MILLISECONDS,
     4                                   new LinkedBlockingQueue<Runnable>());
     5 }
     6 public static ExecutorService newSingleThreadExecutor() {
     7     return new FinalizableDelegatedExecutorService
     8         (new ThreadPoolExecutor(1, 1,
     9                                 0L, TimeUnit.MILLISECONDS,
    10                                 new LinkedBlockingQueue<Runnable>()));
    11 }
    12 public static ExecutorService newCachedThreadPool() {
    13     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    14                                   60L, TimeUnit.SECONDS,
    15                                   new SynchronousQueue<Runnable>());
    16 }

      从上述三个静态方法的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过都已配置好了参数。  

      newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

      newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

      newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。  

      实际使用中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

      另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类并重写里面的方法。

      最后,我们再讨论下如何合理配置线程池的大小?  

      一般情况下,我们需要根据任务的类型来配置线程池大小:

      如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

      如果是IO密集型任务,参考值可以设置为2*NCPU

      当然,上述建议只是参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

  • 相关阅读:
    简单的模板解析函数
    HTML通过事件传递参数到js 二 event
    HTML通过事件传递参数到js一
    通过this获取当前点击选项相关数据
    LeetCode 20. 有效的括号(Valid Parentheses)
    LeetCode 459. 重复的子字符串(Repeated Substring Pattern)
    LeetCode 14. 最长公共前缀(Longest Common Prefix)
    LeetCode 168. Excel表列名称(Excel Sheet Column Title)
    LeetCode 171. Excel表列序号(Excel Sheet Column Number) 22
    LeetCode 665. 非递减数列(Non-decreasing Array)
  • 原文地址:https://www.cnblogs.com/lizhangyong/p/8376890.html
Copyright © 2011-2022 走看看