zoukankan      html  css  js  c++  java
  • Java 线程池(二)

    简介

    在上篇 Java 线程池(一) 我们介绍了线程池中一些的重要参数和具体含义,这篇我们看一看在 Java 中是如何去实现线程池的,要想用好线程池,只知其然是远远不够的,我们需要深入实现源码去了解线程池的具体实现细节,这样才能更好的使用到我们的工作中,当出现问题时能快速找到问题根源所在。

    线程池如何处理提交的任务

    我们向线程池提交任务有两种方式,分别是通过 submit 方法提交和通过 execute 方法提交,这两种方式的区别为 execute 只能提交 Runnable 类型的任务并且没有返回值,而 submit 既能提交 Runnable 类型的任务也能提交 Callable(JDK 1.5+)类型的任务并且会有一个类型 Future 的返回值,我们知道 Runnable 是没有返回值的,所以只有当提交 Callable 类型的任务时才会有返回值,而提交 Runnable 的返回值是 nullexecute 执行任务时,如果此时遇到异常会直接抛出,而 submit 不会直接抛出,只有在使用 Futureget 方法获取任务的返回结果时,才会抛出异常。
    通过查看 ThreadPoolExecutor 的源码我们发现,其 submit 方法是继承自其抽象父类 AbstractExecutorService 而来的,有三个重载的方法,分别可以提交 Runnable 类型和 Callable 类型的任务。无论是哪个 submit 方法最终还是调用了 execute 方法来实现的。方法源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
    }

    public <T> Future<T> (Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
    }

    public <T> Future<T> (Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
    }

    首先对提交的任务进行判非空指针后,三个方法都是调用 newTaskFor 方法把任务统一封装成 RunnableFuture 对象,然后把封装好的对象作为 execute 方法的入参去执行,而此时 execute 方法还未实现,这个方法是在 AbstractExecutorService 的继承类 ThreadPoolExecutor 中实现。下面看看 newTaskFor 方法是如何封装我们提交的任务的,两个重载方法的源码如下:

    1
    2
    3
    4
    5
    6
    7
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
    }

    那么这个 FutureTask 是个什么东东呢,进入其源码发现它实现了 RunnableFuture 接口,而 RunnableFuture 接口的作用正如其名,它是 RunnableFuture 的结合体,表示一个能异步返回结果的线程。我们知道 Runnable 是不能返回结果的,所以上面第一个 newTaskFor(Runnable runnable, T value) 方法的第二个参数 value 的作用就是指定返回结果。其实最后也是通过 RunnableAdapterRunnablevalue 封装成 Callable 的。下面我们看看 execute 方法是怎么处理的,方法源码如下:

    thread-pool-3.png

    第 ① 步 获取当前的 ctl 值,在上篇 Java 线程池(一) 中说过,变量 ctl 存储了线程池的工作状态 runState 和线程池中正在运行的线程数 workerCount
    第 ② 步 通过 workerCountOf 方法取出线程池中当前正在运行的线程数( ctl 低 29 位的值),如果线程池当前工作线程数小于核心线程数 corePoolSize,则进行第 ③ 步。
    第 ③ 步 通过 addWorker 方法新建一个线程加到线程池中,addWorker 方法的第二个参数如果为 true 则限制添加线程的数量是根据 corePoolSize 来判断,反之则根据 maximumPoolSize 来判断,并把任务添加到该线程中。
    第 ④ 步 如果添加失败,则重新获取 ctl 的值。
    第 ⑤ 步 如果当前线程池的状态是运行状态(state < SHUTDOWN)并且把任务成功添加到队列中。
    第 ⑥ 步 重新获取 ctl 的值,再次判断线程池的运行状态,如果不是运行状态,要从队列中移除任务,因为到这一步了,意味着之前已经把任务成功添加到队列中了,所以需要从队列移除。移除成功后调用拒绝策略对任务进行处理,整个 execute 方法结束(PS:为什么不在入队列之前就先判断线程池的状态呢?因为判断一个线程池工作处于运行状态到执行入队列操作这段时间,线程池可能已经被其它线程关闭了,所以提前判断其实毫无意义)。
    第 ⑦ 步 通过 workerCountOf 方法取出线程池中当前正在运行的线程数( ctl 低 29 位的值),如果是 0 则执行 addWorker(null, false) 方法,第一个参数传 null 表示只是在线程池中创建一个线程出来,但是没有立即启动,因为我们创建线程池时可能要求核心线程数量为 0。第二个参数为 false 表示限制添加线程时根据 maximumPoolSize 来判断,如果当前线程池中正在运行线程数量大于 0 ,则直接返回,因为在上面第 ⑤ 步已经把任务成功添加到队列 workQueue 中,它会在将来的某个时刻执行到。
    第 ⑧ 步 如果执行到这个地方,只有两种情况,一种是线程池的状态已经不是运行状态了,另一种是线程池是运行状态,但是此时线程池的工作线程数大于等于核心线程数(workerCount >= corePoolSize)并且队列 workQueue 已满。这时会再次调用 addWorker 方法,第二个参数传的 false,意味着限制添加线程的数量是根据 maximumPoolSize 来判断的,如果失败则调用拒绝策略对任务进行处理,整个 execute 方法结束。
    上面的 execute 方法中多次调用 addWorker,该方法的主要作用就是创建一个线程来执行任务。addWorker 的方法签名如下:

    1
    addWorker(Runnable firstTask, boolean core)

    第一个参数 firstTask 如果不为 null,则创建的线程首先执行 firstTask 任务,然后才会从队列中获取任务,否则会直接从队列中获取任务。第二个参数如果为 true,则表示限制添加线程时根据 corePoolSize 来判断,否则根据maximumPoolSize 来判断。我们看看 addWorker 方法的源码,方法源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);


    if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
    firstTask == null &&
    ! workQueue.isEmpty()))
    return false;

    for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;
    if (compareAndIncrementWorkerCount(c))
    break retry;
    c = ctl.get(); // Re-read ctl
    if (runStateOf(c) != rs)
    continue retry;
    // else CAS failed due to workerCount change; retry inner loop
    }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    // Recheck while holding lock.
    // Back out on ThreadFactory failure or if
    // shut down before lock acquired.
    i 大专栏  Java 线程池(二)nt rs = runStateOf(ctl.get());

    if (rs < SHUTDOWN ||
    (rs == SHUTDOWN && firstTask == null)) {
    if (t.isAlive()) // precheck that t is startable
    throw new IllegalThreadStateException();
    workers.add(w);
    int s = workers.size();
    if (s > largestPoolSize)
    largestPoolSize = s;
    workerAdded = true;
    }
    } finally {
    mainLock.unlock();
    }
    if (workerAdded) {
    t.start();
    workerStarted = true;
    }
    }
    } finally {
    if (! workerStarted)
    addWorkerFailed(w);
    }
    return workerStarted;
    }

    方法首先获取线程池 ctl 属性的值,该属性包含了线程池的运行状态工作线程数,通过 runStateOf 获取线程池的运行状态,然后执行下面这个比较复杂的条件判断

    thread-pool-4.png

    第 ① 个条件表示此时线程池已经不再接受新任务了,接下来的 ②、③、④ 三个判断条件只要有一个不满足,那么方法就会返回 false,方法结束。第 ② 个条件表示线程池为关闭状态,处于关闭状态的线程池不会处理新提交的任务,但会处理完已处理的任务,第 ③ 个条件为 firstTask 为 null,第 ④ 个条件为队列不为空。我们看看如果线程池此时为关闭状态的情况,这种情况线程池不会接受新提交的任务,所以此时如果传入的 firstTask 不为 null,则会直接返回 false;然后如果 firstTasknull,并且队列 workQueue 为空,此时也会返回 false,因为此时队列里已经没有任务了,那么也不需要再添加线程了,然后接下来会进入一个循环。

    thread-pool-5.png

    第 ① 步 调用 workerCountOf 方法获取当前线程池的工作线程数
    第 ② 步 如果当前线程池的工作数大于 CAPACITY 也就是 ctl 的低 29 位的最大值,则返回 false,如果不大于 CAPACITY,然后根据 core (该方法的第二个参数)来判断是和 corePoolSize 比较还是和 maximumPoolSize 比较,如果比这个值大则返回 false
    第 ③ 步 使用 ctlcompareAndSet 原子方法尝试把工作线程数 workerCount + 1,如果增加成功,退出第一层循环。
    第 ④ 步 如果增加线程池工作线程数失败,则重新获取 ctl 的值。
    第 ⑤ 步 调用 runStateOf 获取线程池的状态,如果不等于方法前面获取的 rs,说明线程池的状态已经改变了,回到第一层循环继续执行。
    接下来会启动线程执行任务,源码如下:

    thread-pool-6.png

    第 ① 步 根据 firstTask 创建 Worker 对象,每一个 Worker 对象都会创建一个线程,然后会使用重入锁 ReentrantLock 进行加锁操作。
    第 ② 步 调用 runStateOf 获取线程池的状态,然后进行一个条件判断,第一个 rs < SHUTDOWN 表示线程池是运行状态。如果线程池是运行状态或者线程池是关闭状态并且 firstTasknull,那么就往线程池中加入线程(因为当线程池是 SHUTDOWN 状态时不会再向线程池添加新的任务,但会执行队列 workQueue 中的任务)。这里的 workers 是一个 HashSet,所以其 add 方法不是线程安全的,所以需要加锁操作。然后修改线程池中出现过的最大线程数量 largestPoolSize 记录和把是否添加成功标记 workerAddedtrue。如果 workerAddedtrue 那么会启动线程并把线程是否启动标记 workerStarted 改为 true
    第 ③ 步 根据线程是否启动 workerStarted 标记来判断是否需要进行失败的操作。包含从 workers 移除当前的 worker、线程池的工作线程数减 1、尝试终端线程池。

    线程池中线程是如何执行的

    线程池的线程执行是调用 Workerthread 属性的 start 方法,而 threadrun 方法实际上调用了 Worker 类的 runWorker 方法,所以我们直接来看看 runWorker 方法的源码:

    thread-pool-7.png

    第 ① 步 获取第一个任务,while 循环不断地通过 getTask 方法从队列中获取任务。
    第 ② 步 这个判断条件目的是要保证如果线程池正在停止,要保证当前线程是中断状态,如果是的话,要保证当前线程不是终端状态。
    第 ③ 步 方法 beforeExecute 方法在类 ThreadPoolExecutor 中没有做任何操作,是留给子类去自定义在线程执行之前添加操作的方法。
    第 ④ 步 执行 task.run() 执行任务(PS:这里为什么是调用 run 方法而不是调用 start 方法呢?我们知道当调用了 start 方法后操作系统才会给我们创建一个独立的线程来运行,而调用 run 方法只是一个普通的方法调用,而线程池正好就是需要它是一个普通的方法才能进行任务的调度。我们可以想象一下,假如这里是调用的 Runnable 的 start 方法,那么会是什么结果呢。如果我们往一个核心线程数、最大线程数为 3 的线程池里丢了 500 个任务,那么它会额外的创建 500 个线程,同时每个任务都是异步执行的,结果一下子就执行完毕了,根本无法对任务进行调度。从而没法做到由这 3 个 Worker 线程来调度这 1000 个任务,而只有当做一个普通的 run 方法调用时才能满足线程池的这个要求)。
    第 ⑤ 步 方法 afterExecute 方法在类 ThreadPoolExecutor 中没有做任何操作,是留给子类去自定义在线程执行之后添加操作的方法。completedAbruptly 变量是用来表示在执行任务过程中是否出现了异常,processWorkerExit 方法中会对该变量的值进行判断。
    接下来我们看看 getTask 方法是如何从队列中获取任务的,方法源码如下:

    thread-pool-8.png

    第 ① 步 如果线程池不是运行状态,则判断线程池是否正在停止或者当前队列为空,如果条件满足将线程池的工作线程数减一并返回 null。因为如果当前线程池状态的值是 SHUTDOWN 或以上时,就不允许再向队列中添加任务了。
    第 ② 步 这里的 timed 变量用来标记是否需要线程进行超时控制,allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时。wc > corePoolSize 表示当前线程池中的工作线程数量大于核心线程数量,对于超过核心线程数量的这些线程,需要进行超时控制。
    第 ③ 步 第一个判断 wc > maximumPoolSize 如果成立是因为可能在此方法执行阶段同时执行了线程池的 setMaximumPoolSize 方法;第二个判断 timed && timedOut 如果成立表示当前操作需要进行超时控制,并且上次从队列中获取任务发生了超时(timeOut 变量的值表示上次从阻塞队列中取任务时是否超时);第三个判断 wc > 1 || workQueue.isEmpty() 如果线程池中工作线程数量大于 1,或者队列是空的,那么尝试将 workerCount 减一,如果减一失败,则返回重试。如果 wc == 1 时,也就说明当前线程是线程池中唯一的一个线程了。
    第 ④ 步 根据 timed 来判断,如果为 true,则通过阻塞队列的 poll 方法进行超时控制,如果在 keepAliveTime 时间内没有获取到任务,则返回 null,否则通过 take 方法,如果这时队列为空,则 take 方法会阻塞直到队列不为空。如果 r == null,说明已经超时,timedOut 设置为 true
    第 ⑤ 步 如果获取任务时当前线程发生了中断,则设置 timedOutfalse 并重新循环重试。

    关闭线程池

    线程池的关闭一般都是使用 shutdown 方法和 shutdownNow 方法,两者的区别是前面的 shutdown 方法不会执行新的任务,但是会执行完当前正在执行的任务,而后面的 shutdownNow 方法会立即停止当前线程池,不管当前是否有线程在执行。一般都是使用 shutdown 方法来停止线程池,其方法源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    checkShutdownAccess();
    advanceRunState(SHUTDOWN);
    interruptIdleWorkers();
    onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
    mainLock.unlock();
    }
    tryTerminate();
    }

    advanceRunState(SHUTDOWN) 方法的作用是通过 CAS 原子操作将线程池的状态更改为关闭状态。interruptIdleWorkers 方法是对空闲的线程进行中断,其实是调用重载带参数的函数 interruptIdleWorkers(false)。然后 onShutdown 方法和上文提到的 beforeExecuteafterExecute 方法一样,在类 ThreadPoolExecutor 是空实现,也是个钩子函数。我们看看 interruptIdleWorkers 的实现源码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    for (Worker w : workers) {
    Thread t = w.thread;
    if (!t.isInterrupted() && w.tryLock()) {
    try {
    t.interrupt();
    } catch (SecurityException ignore) {
    } finally {
    w.unlock();
    }
    }
    if (onlyOne)
    break;
    }
    } finally {
    mainLock.unlock();
    }
    }

    先进行加锁操作,然后遍历 workers 容器,也就是遍历线程池中的线程,对每个线程进行 tryLock 操作,如果成功说明线程空闲,则设置其中断标志位。而线程是否响应中断则交给我们定义任务的人来决定。

    总结

    本文比较详细的分析了线程池任务的提交、线程的执行、线程池的关闭的工作流程。通过学习线程池相关的源码后,看到了在其内部用运用了很多多线程的解决方法,有如下几个方式:

    1. 通过定义重入锁 ReentrantLock 变量 mainLock 来解决并发多线程的安全问题
    2. 利用等待机制来实现线程之间的通讯问题
      除了内置的功能外,ThreadPoolExecutor 也向外提供了两个接口供我们自己扩展满足我们需求的线程池,这两个接口分别是:beforeExecute 任务执行前执行的方法,afterExecute 任务执行结束后执行的方法。
  • 相关阅读:
    sql 执行动态语句
    Cookie/Session机制详解
    .NET简谈事务、分布式事务处理
    .NET(C#)中不同级别的安全透明代码对类型的影响
    C#开发微信门户及应用(1)开始使用微信接口
    WIN7管理工具配置ODBC数据源系统DSN中无Oracle,Sybase驱动的解决方法
    题解 smoj 2806 【建筑物】
    题解 luogu P2568 GCD
    题解 luogu P1251 【餐巾计划问题】
    0377组合总和IV Marathon
  • 原文地址:https://www.cnblogs.com/lijianming180/p/12371024.html
Copyright © 2011-2022 走看看