zoukankan      html  css  js  c++  java
  • 线程池的内部实现

    上一篇文章讲了线程池的基本用法和一些使用技巧,这篇文章就继续深入的了解线程池,看看常用的线程池的内部实现,话不多说,开始:

      对于几个核心的线程池,无论是newFixedThreadPool(int nThreads),newSingleThreadExecutor()和newCacheedThreadPool(),虽然看起来创建的线程有着完全不同的功能特点,但是其实内部实现都使用了 ThreadPoolExecutor 实现,下面看看这三个线程的实现:

     1 public static ExecutorService newFixedThreadPool(int nThreads) {
     2         return new ThreadPoolExecutor(nThreads, nThreads,
     3                                       0L, TimeUnit.MILLISECONDS,
     4                                       new LinkedBlockingQueue<Runnable>());
     5     }
     6 public static ExecutorService newSingleThreadExecutor() {
     7         return new FinalizableDelegatedExecutorService
     8             (new ThreadPoolExecutor(1, 1,
     9                                     0L, TimeUnit.MILLISECONDS,
    10                                     new LinkedBlockingQueue<Runnable>()));
    11     }
    12 public static ExecutorService newCachedThreadPool() {
    13         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    14                                       60L, TimeUnit.SECONDS,
    15                                       new SynchronousQueue<Runnable>());
    16     }

     由以上线程池的实现代码可以看出,它们都只是 ThreadPoolExecutor 类的封装。所以我们了解了ThreadPoolExecutor ,就了解了线程池,我们就从它最重要的构造函数了解开始:

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

    函数的参数含义:

      ❤ corePoolSize:指定了线程池中的线程数量,默认情况下,线程池中的线程数量为0,当有任务来时才去创建线程;

      ❤ maximumPoolSize:指定了线程池中的最大线程数量;

      ❤ keepAliveTime:当线程池线程数量超过corePoolSize时,多于的空闲线程的存活时间,即,超过corePoolSize的空闲时间,在多长时间内,会被销毁;

      ❤ unit:keepAliveTime的单位;

      ❤ workQueue:任务队列,被提交但尚未被执行的任务;

      ❤ threadFactory:线程工厂,用于创建线程,一般默认的就行;

      ❤ handler:拒接策略。当任务太多来不及处理,如何拒绝任务。

    corePoolSize和maximumPoolSize的理解

    这里特别罗列几条规则解释这两个参数:

      ①线程池中的线程数量小于corePoolSize,新的任务都不排队而是直接新增线程执行;

      ②线程池的线程数量大于等于corePoolSize,而workQueue未满,新的任务加入workQueue而不是新增线程;

      ③线程池的线程数量大于等于corePoolSize,并且workQueue已满,但是线程数量小于maximumPoolSize,新增新的线程来处理任务;

      ④线程池的线程数量大于等于corePoolSize,并且workQueue已满,而且线程数量大于等于maximumPoolSize,新的任务被拒绝,使用handler处理被拒绝的任务;

    workQueue和handler的详细说明

    workQueue:

       workQueue指的是被提交但是未被执行的任务队列,它是一个BlockingQueue 接口的对象,仅用于存放Runnable对象。

      根据队列功能来分类,在 ThreadPoolExecutor 的构造函数中可使用下面几种BlockingQueue:

        ❤ 直接提交的队列:该功能由SynchronousQueue 对象提供,SynchronousQueue是一个特殊的BlockingQueue。SynchronousQueue 没有容量,每一个插入操作都要等待相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用SynchronousQueue,提交的任务不会被真实的保存,而总是将新的任务提交给线程执行,如果没有空闲的线程,则会尝试创建线程,如果线程数量已经达到最大值,则执行拒绝策略。因此,使用SynchronousQueue队列,通常需要设置很大的maximumPoolSize值,否则很容易执行拒绝策略;

        ❤ 有界的任务队列:有界的任务队列可使用ArrayBlockingQueue实现,ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量  public ArrayBlockingQueue(int capacity),当时用有界的任务队列时,若有新的任务需要执行,如果线程池中线程数量小于corePoolSize,则会优先创建新的线程,若大于等于corePoolSize,则会把新的任务加入到等待队列中,若队列已满,无法加入,则在总线程数量不大于maximumPoolSize的前提下,创建新的线程执行。若大于等于maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提到corePoolSize之上,换言之,除非系统非常繁忙,否则核心线程数维持在corePoolSize。

        ❤ 无界的任务队列:无界的任务队列可以通过LinkedBlockingQueue 类实现。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,线程池的线程数量小于corePoolSize时,线程池会生成新的线程执行任务,当线程池的线程数量到达corePoolSize时,线程数量就不会增加。若后续有新的任务提交,而又没有空闲的线程,则任务就会直接进入无界的队列等待。若任务的创建和处理的速度差异很大,无界队列就会保持快速增长,直到系统资源耗尽。

        ❤ 优先任务队列:优先任务队列是带有执行优先级的队列。它通过PriorityBlockingQueue 实现,可以控制任务的执行先后顺序,它是一个特殊的无界队列。无论是有界队列ArrayBlockingQueue,还是未指定大小的无界队列LinkedBlockingQueue 都是按照先进先出算法处理的。而PriorityBlockingQueue 则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,同时也有很好的质量保证(总是确保高优先级的任务先执行)。

    明白了几种任务队列,我们再来回顾上述三种核心的线程池的实现,加深对常用核心的线程池的理解:

      ⑴newFixedThreadPool(int nThreads)方法:这个方法返回了一个corePoolSize和maximumPoolSize大小一样的,并且使用了LInkedBlockingQueue 任务队列的线程池。因为对于固定大小的线程池来说,不存在线程数量的动态变化,因此corePoolSize和maximumPoolSize可以相等,同时,它使用无界队列来存放无法立即执行的任务,当任务提交非常频繁的时候,该队列可能迅速膨胀,从而耗尽系统资源。

      ⑵newSingleThreadExecutor()方法:该方法返回的是一个单线程线程池,是newFixedThreadPool的一种退化,只是简单的将线程数量设置为1而已。

      ⑶newCachedThreadPool()方法:该方法返回corePoolSize为0,maximumPoolSize为Integer的最大值。这就意味着在没有任务时,该线程池内无线程,当任务被提交时,该线程池会使用空闲的线程执行任务,若无空闲线程,则将加入SynchronousQueue队列,而SynchronousQueue 队列是一种直接提交的队列,它总是会迫使线程池增加新的线程执行任务,当任务执行完毕后,由于corePoolSize为0,因此空闲线程又会在指定时间内(60S)被回收。对于newCachedThreadPool(),如果有大量的任务被提交,而任务执行又不那么快时,那么系统便会开启等量的线程处理,这样的做法可能会很快耗尽系统资源。

      下面看看ThreadPoolExecutor线程池的核心调度代码:

     1 public void execute(Runnable command) {
     2         if (command == null)
     3             throw new NullPointerException();
     4         int c = ctl.get();
     5         if (workerCountOf(c) < corePoolSize) {
     6             if (addWorker(command, true))
     7                 return;
     8             c = ctl.get();
     9         }
    10         if (isRunning(c) && workQueue.offer(command)) {
    11             int recheck = ctl.get();
    12             if (! isRunning(recheck) && remove(command))
    13                 reject(command);
    14             else if (workerCountOf(recheck) == 0)
    15                 addWorker(null, false);
    16         }
    17         else if (!addWorker(command, false))
    18             reject(command);
    19     }

    上述代码的第5行workerCountOf()方法取得了当前线程池的线程总数。当线程总数小于corePoolSize核心线程数时,会将任务通过addWorker()方法直接调度执行,否则在第10行代码处(workQueue.offer())进入等待队列,如果进入队列失败(比如有界队列到达了上限,或者使用了SynchronousQueue),则会执行第17行,将任务直接提交给线程池。如果当前线程数量已经达到了maximumPoolSize,则提交失败,执行第18行的拒绝策略。

    handler():

      JDK内置提供了四种拒绝策略:

      ⒈ AbortPolicy策略:该策略直接抛出异常,阻止系统正常工作;

      ⒉ CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降;

      ⒊ DiscardOledestPolicy策略:该策略丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

      ⒋ DiscardPolicy策略:该策略默默丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这就是最好的一种方案。

     上面四种内置策略均实现了 RejectedExecutionHandler 接口,若上面的拒绝策略无法满足实际需求,你也可以自己扩展 RejectedExecutionHandler 接口,RejectExecutionHandler 的定义如下:

    1 public interface RejectedExecutionHandler{
    2     void rejectedExecution(Runnable r,ThreadPoolExecutor executor);
    3 }

    其中r为请求的任务,executor 为当前的线程池。

    下面演示下拒绝策略的使用:

     1 public class RejectThreadPoolDemo {
     2 
     3     public static class MyTask implements Runnable{
     4         @Override
     5         public void run() {
     6             System.out.println(System.currentTimeMillis() + " : Thread ID:" + Thread.currentThread().getId());
     7             try {
     8                 Thread.sleep(100);
     9             } catch (InterruptedException e) {
    10                 e.printStackTrace();
    11             }
    12         }
    13     }
    14     //测试
    15     public static void main(String[] args) throws InterruptedException {
    16         MyTask myTask = new MyTask();
    17         ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),
    18                 new RejectedExecutionHandler() {
    19                     @Override
    20                     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    21                         System.out.println(r.toString() + " is discard!");
    22                     }
    23                 });
    24         //循环提交任务
    25         for (int i = 0; i < Integer.MAX_VALUE;i++){
    26             es.submit(myTask);
    27             Thread.sleep(10);
    28         }
    29         es.shutdown();
    30     }
    31 }

    输出结果:

    ......
    1538097339848 : Thread ID:12
    1538097339858 : Thread ID:13
    1538097339868 : Thread ID:14
    1538097339878 : Thread ID:15
    java.util.concurrent.FutureTask@330bedb4 is discard!
    java.util.concurrent.FutureTask@2503dbd3 is discard!
    java.util.concurrent.FutureTask@4b67cf4d is discard!
    java.util.concurrent.FutureTask@7ea987ac is discard!
    java.util.concurrent.FutureTask@12a3a380 is discard!
    1538097339938 : Thread ID:11
    1538097339948 : Thread ID:12
    1538097339958 : Thread ID:13
    ......

    上述代码在16到23行,自定义了一个线程池,该线程池拥有5个常驻线程,并且最大线程池数量也是5个,这和固定大小的线程池是一样的。但是它却只拥有一个容量只有10的任务队列;然后我们自定义了拒绝策略,打印出了被拒绝的任务;从输出结果来看,在执行了一些任务后,拒绝策略就生效了。

    参考:《Java高并发程序设计》 葛一鸣 郭超 编著:

    作者:Joe
    努力了的才叫梦想,不努力的就是空想,努力并且坚持下去,毕竟这是我相信的力量
  • 相关阅读:
    MFC Windows 程序设计>WinMain 简单Windows程序 命令行编译
    AT3949 [AGC022D] Shopping 题解
    CF643D Bearish Fanpages 题解
    CF643C Levels and Regions 题解
    CF241E Flights 题解
    CF671C Ultimate Weirdness of an Array 题解
    CF1592F Alice and Recoloring 题解
    GYM 102452E 题解
    CF494C Helping People 题解
    P5556 圣剑护符
  • 原文地址:https://www.cnblogs.com/Joe-Go/p/9712191.html
Copyright © 2011-2022 走看看