zoukankan      html  css  js  c++  java
  • 线程池原理

    一、简介

      线程池简单来说就是一组线程的集合,通过线程池可以达到线程复用的目的,从而避免频繁创建线程和销毁过程中的开销。在应用上,线程可用在后端相关服务上。最常见的比如说数据库服务器,web服务器上。例如web服务器,可能会接收到很多很多断短时的http请求,如果我们为每一个http请求创建一个处理线程,那么势必会消耗大量服务器资源,严重时可能将服务器资源耗尽。当然我们也可以管理并复用自己已创建的线程,来限制资源的消耗,但是那势必会使业务逻辑变的复杂。JDK已经给我们提供了丰富的线程池工具类。使用这些工具类可以给我们带来极大的方便。简单了解其原理,对一个开发来说还是很有必要的。

    二、继承体系

    如上图:最顶层是Executor接口,这个接口中只声明了一个execute方法。ExecutorService在其父类接口的基础上,又声明了submit、shutdown、invokerAll、invokeAny等方法。至于ScheduledExecutorService接口则声明了一些和定时任务相关的方法。比如schedule和scheduleAtFixedRate。线程池的核心实现类是在ThreadPoolExecutor,我们通过Executors调用newFixedThreadPool、newSingleThreadExector和newCachedThreadPool等创建的线程均是ThreadPoolExecutor类型。

    三、原理分析

     1、核心参数

      1、1 参数简介

      线程池的核心实现类是ThreadPoolExecutor。这个类包含了几个重要属性,可以通过构造器设置参数值,构造方法如下:

    1 public ThreadPoolExecutor(int corePoolSize,
    2                           int maximumPoolSize,
    3                           long keepAliveTime,
    4                           TimeUnit unit,
    5                           BlockingQueue<Runnable> workQueue,
    6                           ThreadFactory threadFactory,
    7                           RejectedExecutionHandler handler)

    构造方法的参数即线程池的核心参数,下面是每个参数的具体含义:

    参数 说明
    corePoolSize
     核心线程数,当线程池中当线程数小于该值时会优先创建新线程处理新任务
    maximumPoolSize
    线程池所能创建当最大线程数
    keepAliveTime
     空闲线程的存活时间

        workQueue

     任务队列,用于存放未执行的任务
    threadFactory
     线程工厂,可以设置比如线程名称等内容

      handler

    拒绝策略。当线程池和任务队列满时,采用拒绝策略处理新来的任务,默认时AbortPolicy,即直接抛出异常

      

     

      1.2 线程创建规则

      线程创建的数量主要和corePoolSize、maximumPoolSize两个参数有关,而线程的创建时机主要和corePoolSize、workQueue两个参数有关。下面列举4个线程的创建规则(线程池中无空闲线程)

    序号 条件 动作
    1 线程数量<corePoolSize 创建线程
    2 线程数量≥corePoolSize,且workQueue未满 任务放入队列,等待被执行
    3 线程数量≥corePoolSize,且workQueue已满,线程数量<maximumPoolSize 创建新线程,执行任务
    4 线程数量≥maximumPoolSize,且workQueue已满 执行拒绝策略

      

     

      1.3线程资源回收

      因为系统的资源是有限的,所以需要适时的对空闲线程资源进行回收。这里的空闲线程包括两个部分,一部分是超出corePoolSize的空闲线程,一部分是corePoolSize内的核心线程,回收的时间由keepLiveTime确定。此外回收线程的前提是allowCoreThreadTimeOut属性被设置为true, public void allowCoreThreadTimeOut(boolean) 方法可以设置属性值。

      1.4排队策略

      JDK提供了同步队列、有界队列、无界队列、优先级队列来缓存任务,四种队列具体的实现类如下:

    实现类 类型 说明
    ArrayBlockingQueue 有界队列 是基于数组的阻塞队列,按照FIFO原则对元素进行排序
    SynchronousQueue 同步队列 该队列比较特殊,它不存储元素,每次插入操作必须等待另一线程调用移除操作,否则插入操作就会一直阻塞
    LinkedBlockingQueue 无界队列 基于链表的阻塞队列,按照FIFO原则对元素进行排序
    PriorityBlockingQueue 优先级队列 具有优先级的阻塞队列

      

     

      

      1.5拒绝策略

         当线程池线程数量大于等于maximumPoolSize,并且workQueue队列已满的情况下使用拒绝策略,线程池的拒绝策略有以下几种:

    实现类 说明
    AbortPolicy 丢弃新任务,并抛出RejectExecutionException
    DiscardPolicy 直接丢弃任务,不做任何操作
    DiscardOldestPolicy 丢弃队列队首的任务,执行新任务
    CallerRunsPolicy 由调用者执行新任务

    线程池默认执行AbortPolicy策略,我们可以通过方法public void setRejectedExecutionHandler(RejectedExecutionHandler)修改线程池决绝策略。

      

      2 、重要操作

      2.1线程的创建与复用

      在线程池的实现上,线程的创建是通过线程工厂接口ThreadFactory接口的实现类来完成的。默认使用的是Exectors.defaultThreadFactory()方法返回的线程工厂实现类。 同上我们也可以调用方法setThreadFactory(ThreadFactory)方法来设置。

      线程的复用是线程池的关键所在,这要求一个线程在执行完任务之后不能立即退出。对应到具体的实现上,就是一个线程执行完任务之后,会去任务队列获取新的任务,如果任务队列中没有任务,且线程池没有设置keepLiveTime,则这个线程就会一直阻塞下去,以此来达到线程复用的目的。

    下面是线程创建和复用的的部分代码:

     1 +----ThreadPoolExecutor.Worker.java
     2 Worker(Runnable firstTask) {
     3     setState(-1);
     4     this.firstTask = firstTask;
     5     // 调用线程工厂创建线程
     6     this.thread = getThreadFactory().newThread(this);
     7 }
     8 
     9 // Worker 实现了 Runnable 接口
    10 public void run() {
    11     runWorker(this);
    12 }
    13 
    14 +----ThreadPoolExecutor.java
    15 final void runWorker(Worker w) {
    16     Thread wt = Thread.currentThread();
    17     Runnable task = w.firstTask;
    18     w.firstTask = null;
    19     w.unlock();
    20     boolean completedAbruptly = true;
    21     try {
    22         // 循环从任务队列中获取新任务
    23         while (task != null || (task = getTask()) != null) {
    24             w.lock();
    25             // If pool is stopping, ensure thread is interrupted;
    26             // if not, ensure thread is not interrupted.  This
    27             // requires a recheck in second case to deal with
    28             // shutdownNow race while clearing interrupt
    29             if ((runStateAtLeast(ctl.get(), STOP) ||
    30                  (Thread.interrupted() &&
    31                   runStateAtLeast(ctl.get(), STOP))) &&
    32                 !wt.isInterrupted())
    33                 wt.interrupt();
    34             try {
    35                 beforeExecute(wt, task);
    36                 Throwable thrown = null;
    37                 try {
    38                     // 执行新任务
    39                     task.run();
    40                 } catch (RuntimeException x) {
    41                     thrown = x; throw x;
    42                 } catch (Error x) {
    43                     thrown = x; throw x;
    44                 } catch (Throwable x) {
    45                     thrown = x; throw new Error(x);
    46                 } finally {
    47                     afterExecute(task, thrown);
    48                 }
    49             } finally {
    50                 task = null;
    51                 w.completedTasks++;
    52                 w.unlock();
    53             }
    54         }
    55         completedAbruptly = false;
    56     } finally {
    57         // 线程退出后,进行后续处理
    58         processWorkerExit(w, completedAbruptly);
    59     }
    60 }

      

      2.2 提交任务

      通常情况下我们可以使用submit()方法来提交任务。提交的任务可能立即被执行,也可能被放入任务队列,也可能会被拒绝执行,处理的过程如下:

      从上图可以看出,当一个新的任务被提交时,线程池的处理步骤如下:

      1、判断当前核心线程是否已被全部使用,如果没有,则创建一个线程执行任务。否则,执行步骤2。

      2、判断任务队列是否已满,如果没有,将任务放入队列缓存任务。否则,执行步骤3。

      3、判断当前线程数是否超过最大线程数,如果没有,创建线程执行任务。否则,执行步骤4。

      4、执行拒绝策略。

      代码实现如下:

      1 +---- AbstractExecutorService.java
      2 public Future<?> submit(Runnable task) {
      3     if (task == null) throw new NullPointerException();
      4     // 创建任务
      5     RunnableFuture<Void> ftask = newTaskFor(task, null);
      6     // 提交任务
      7     execute(ftask);
      8     return ftask;
      9 }
     10 
     11 +---- ThreadPoolExecutor.java
     12 public void execute(Runnable command) {
     13     if (command == null)
     14         throw new NullPointerException();
     15 
     16     int c = ctl.get();
     17     // 如果工作线程数量 < 核心线程数,则创建新线程
     18     if (workerCountOf(c) < corePoolSize) {
     19         // 添加工作者对象
     20         if (addWorker(command, true))
     21             return;
     22         c = ctl.get();
     23     }
     24     
     25     // 缓存任务,如果队列已满,则 offer 方法返回 false。否则,offer 返回 true
     26     if (isRunning(c) && workQueue.offer(command)) {
     27         int recheck = ctl.get();
     28         if (! isRunning(recheck) && remove(command))
     29             reject(command);
     30         else if (workerCountOf(recheck) == 0)
     31             addWorker(null, false);
     32     }
    33

    //
    添加工作者对象,并在 addWorker 方法中检测线程数是否小于最大线程数 35 else if (!addWorker(command, false)) 36 // 线程数 >= 最大线程数,使用拒绝策略处理任务 37 reject(command); 38 } 39 40 private boolean addWorker(Runnable firstTask, boolean core) { 41 retry: 42 for (;;) { 43 int c = ctl.get(); 44 int rs = runStateOf(c); 45 46 // Check if queue empty only if necessary. 47 if (rs >= SHUTDOWN && 48 ! (rs == SHUTDOWN && 49 firstTask == null && 50 ! workQueue.isEmpty())) 51 return false; 52 53 for (;;) { 54 int wc = workerCountOf(c); 55 // 检测工作线程数与核心线程数或最大线程数的关系 56 if (wc >= CAPACITY || 57 wc >= (core ? corePoolSize : maximumPoolSize)) 58 return false; 59 if (compareAndIncrementWorkerCount(c)) 60 break retry; 61 c = ctl.get(); // Re-read ctl 62 if (runStateOf(c) != rs) 63 continue retry; 64 // else CAS failed due to workerCount change; retry inner loop 65 } 66 } 67 68 boolean workerStarted = false; 69 boolean workerAdded = false; 70 Worker w = null; 71 try { 72 // 创建工作者对象,细节参考上一节所贴代码 73 w = new Worker(firstTask); 74 final Thread t = w.thread; 75 if (t != null) { 76 final ReentrantLock mainLock = this.mainLock; 77 mainLock.lock(); 78 try { 79 int rs = runStateOf(ctl.get()); 80 if (rs < SHUTDOWN || 81 (rs == SHUTDOWN && firstTask == null)) { 82 if (t.isAlive()) // precheck that t is startable 83 throw new IllegalThreadStateException(); 84 // 将 worker 对象添加到 workers 集合中 85 workers.add(w); 86 int s = workers.size(); 87 // 更新 largestPoolSize 属性 88 if (s > largestPoolSize) 89 largestPoolSize = s; 90 workerAdded = true; 91 } 92 } finally { 93 mainLock.unlock(); 94 } 95 if (workerAdded) { 96 // 开始执行任务 97 t.start(); 98 workerStarted = true; 99 } 100 } 101 } finally { 102 if (! workerStarted) 103 addWorkerFailed(w); 104 } 105 return workerStarted; 106 }

      

      2.3关闭线程池

      关闭线程池的方法有两种,第一种是调用shutDown()方法,第二种是调用shutDownNow()方法。两种方法的区别在于前者,会将线程池的状态设置为SHUTDOWN,并且中断空闲的线程。后者会将线程池的状态设置为STOP,并且尝试中断所有的线程。中断线程使用的是Thread.interrupt()方法,未响应的线程是无法中断的,最后shutdown()会将未执行的任务全部返回。线程池被关闭后就不能再提交新的任务了,新提交的任务会使用拒绝策略处理。

     四、几种常用的线程池

      一般情况下我们会使用Exectors工具类来创建线程池。通过该类我们可以构建以下线程:

    静态构造方法 说明
    newFixedThreadPool(int threads) 构建固定线程数的线程池,默认情况下,空闲线程不会被回收
    newCachedThreadPool() 创建线程数不固定的线程池,线程数随着任务量而变动,空闲线程超过60秒将被回收
    newSingleThreadPool() 创建线程数为1的线程池
    newScheduledThreadPool(int corePoolSize) 线程数为corePoolSize的线程池,可执行定时任务的线程池

    参考链接:http://www.tianxiaobo.com/2018/04/17/Java-%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90/

  • 相关阅读:
    记一次 contentInsetAdjustmentBehavior 引发的bug
    Android埋点技术概览
    Android开发快速入门iOS开发概览
    工作项目遇到的一些问题
    ruby操作项目.xcodeproj
    关于performSelector afterDelay:0 的使用
    谷歌Python代码风格指南 中文翻译
    最大子矩阵问题--悬线法dp
    tarjan
    SDU CSPT3模拟
  • 原文地址:https://www.cnblogs.com/shuaixiaobing/p/11756300.html
Copyright © 2011-2022 走看看