zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor源码解析

    参考链接:https://blog.csdn.net/rebirth_love/article/details/51954836

    比较实用的RejectedExecutionHandler实现,丢弃最老的任务

    DiscardOldestPolicy

    第一部分:ThreadPoolExecutor的继承结构

    根据上图可以知道,ThreadPoolExecutor是继承的AbstractExecutorService(抽象类)。再来看一下AbstractExecutorService的结构可以发现,AbstractExecutorService实现了ExecutorService,并且ExecutorService继承Executor接口。

    如下是Executor和ExecutorService接口中一些方法:

    public interface Executor {
    void execute(Runnable command);
    }

    public interface ExecutorService extends Executor {


    void shutdown();


    List<Runnable> shutdownNow();


    boolean isShutdown();


    boolean isTerminated();


    boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;


    <T> Future<T> submit(Callable<T> task);


    <T> Future<T> submit(Runnable task, T result);


    Future<?> submit(Runnable task);


    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;


    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    long timeout, TimeUnit unit)
    throws InterruptedException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
    }

    可以简单总结一下:Executor这个接口只有一个方法execute(Runnable command)(以Command Pattern(命令模式)的设计模式实现);在ExecutorService接口中一部分是和执行器生命周期相关的方法,而另一部分则是以各种方式提交要执行的任务的方法。像submit()就是提交任务的一个方法,在实现中做了适配的工作,无论参数是Runnable还是Callable,执行器都会正确执行。ExecutorService中,和生命周期相关的,声明了5个方法:

    • awaitTermination() 阻塞等待shutdown请求后所有线程终止,会有时间参数,超时和中断也会令方法调用结束
    • isShutdown()  通过ctl属性判断当前的状态是否不是RUNNING状态
    • isTerminated()  通过ctl属性判断当前的状态是否为TERMINATED状态
    • shutdown() 关闭Executor,不再接受提交任务
    • shutdownNow() 关闭Executor,不再接受提交任务,并且不再执行入队列中的任务

    那么再来看一下AbstractExecutorService的源码:

     

    可以看出来:AbstractExecutorService这个类是ExecutorService的一个抽象实现。其中,提交任务的各类方法已经给出了十分完整的实现。之所以抽象,是因为和执行器本身生命周期相关的方法在此类中并未给出任何实现,需要子类扩展完善(模板方法设计模式)拿一个submit方法出来分析一下:

    /**
    * @throws RejectedExecutionException {@inheritDoc}
    * @throws NullPointerException {@inheritDoc}
    */
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
    }

    从代码可以看出实际上用到的是RunnableFuture的实现类FutureTask。但最终还是调用了execute()方法,在子类中实现。

    在正式进入ThreadPoolExecutor源码分析之前还需要补充一点的是:Executors(工厂方法设计模式)java.util.concurrent.Executors是个工具类,提供了很多静态的工具方法。其中很多对于执行器来说就是初始化构建用的工厂方法。

     

    • 重载实现的newFixedThreadPool()
    • 重载实现的newSingleThreadExecutor()
    • 重载实现的newCachedThreadPool()
    • 重载实现的newSingleThreadScheduledExecutor()
    • 重载实现的newScheduledThreadPool()

    这些方法返回的ExecutorService对象最终都是由ThreadPoolExecutor实现的,根据不同的需求以不同的参数配置,或经过其它类包装。其中,Executors中的一些内部类就是用来做包装用的。Executors类中还有静态的defaultThreadFactory()方法,当然也可以自己实现自定义的ThreadFactory。

    第二部分:ThreadPoolExecutor源码分析

    下面正式进入ThreadPoolExecutor:(按照程序运行顺序分析)

    1、ThreadPoolExecutor的全参数构造方法:

    根据注释: 

    • corePoolSize 是线程池的核心线程数,通常线程池会维持这个线程数
    • maximumPoolSize 是线程池所能维持的最大线程数
    • keepAliveTime 和 unit 则分别是超额(空闲)线程的空闲存活时间数和时间单位
    • workQueue 是提交任务到线程池的入队列//无界队列
    • threadFactory 是线程池创建新线程的线程构造器
    • handler 是当线程池不能接受提交任务的时候的处理策略//默认抛出异常

    2、execute方法提交任务

    public void execute(Runnable command) {
    if (command == null)
    throw new NullPointerException();
    /*
    * Proceed in 3 steps:
    *
    * 1. If fewer than corePoolSize threads are running, try to
    * start a new thread with the given command as its first
    * task. The call to addWorker atomically checks runState and
    * workerCount, and so prevents false alarms that would add
    * threads when it shouldn't, by returning false.
    *
    * 2. If a task can be successfully queued, then we still need
    * to double-check whether we should have added a thread
    * (because existing ones died since last checking) or that
    * the pool shut down since entry into this method. So we
    * recheck state and if necessary roll back the enqueuing if
    * stopped, or start a new thread if there are none.
    *
    * 3. If we cannot queue task, then we try to add a new
    * thread. If it fails, we know we are shut down or saturated
    * and so reject the task.
    */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    else if (!addWorker(command, false))
    reject(command);
    }
    通过注释:提交新任务的时候,如果没达到核心线程数corePoolSize,则开辟新线程执行。如果达到核心线程数corePoolSize, 而队列未满,则放入队列,
    否则开新线程处理任务,直到maximumPoolSize,超出则丢弃处理。同时判断目前线程的状态是不是RUNNING其他线程有可能调用了shutdown()或shutdownNow()方法,关闭线程池,
    导致目前线程的状态不是RUNNING。在上面提交任务的时候,会出现开辟新的线程来执行,这会调用addWorker()方法。
    3、addWorker方法
    private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
    firstTask == null &&
    ! workQueue.isEmpty()))
    return false;

    for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;
    if (compareAndIncrementWorkerCount(c))
    break retry;
    c = ctl.get(); // Re-read ctl
    if (runStateOf(c) != rs)
    continue retry;
    // else CAS failed due to workerCount change; retry inner loop
    }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    // Recheck while holding lock.
    // Back out on ThreadFactory failure or if
    // shut down before lock acquired.
    int rs = runStateOf(ctl.get());

    if (rs < SHUTDOWN ||
    (rs == SHUTDOWN && firstTask == null)) {
    if (t.isAlive()) // precheck that t is startable
    throw new IllegalThreadStateException();
    workers.add(w);
    int s = workers.size();
    if (s > largestPoolSize)
    largestPoolSize = s;
    workerAdded = true;
    }
    } finally {
    mainLock.unlock();
    }
    if (workerAdded) {
    t.start();
    workerStarted = true;
    }
    }
    } finally {
    if (! workerStarted)
    addWorkerFailed(w);
    }
    return workerStarted;
    }
    第一部分:第一段从第3行到第26行,是双层无限循环,尝试增加线程数到ctl变量,并且做一些比较判断,如果超出线程数限定或者ThreadPoolExecutor的状态不符合要求,
    则直接返回false,增加worker失败。第二部分:从第28行开始到结尾,把firstTask这个Runnable对象传给Worker构造方法,赋值给Worker对象的task属性。
    Worker对象把自身(也是一个Runnable)封装成一个Thread对象赋予Worker对象的thread属性。锁住整个线程池并实际增加worker到workers的HashSet对象当中。
    成功增加后开始执行t.start(),就是worker的thread属性开始运行,实际上就是运行Worker对象的run方法。
    Worker的run()方法实际上调用了ThreadPoolExecutor的runWorker()方法。在看runWorker()之前先看一下Worker对象。

    4、Worker对象

    Worker是真正的任务,是由任务执行线程完成,它是ThreadPoolExecutor的核心。每个线程池中,有为数不等的Worker对象,每个Worker对象中,包含一个需要立即执行的新任务和已经执行完成的任务数量,Worker本身,是一个Runnable对象,不是Thread对象它内部封装一个Thread对象,用此对象执行本身的run方法,而这个Thread对象则由ThreadPoolExecutor提供的ThreadFactory对象创建新的线程。(将Worker和Thread分离的好处是,如果我们的业务代码,需要对于线程池中的线程,赋予优先级、线程名称、线程执行策略等其他控制时,可以实现自己的ThreadFactory进行扩展,无需继承或改写ThreadPoolExecutor。)

    private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable

    它既实现了Runnable,同时也是一个AQS ( AbstractQueuedSynchronizer )。封装了3样东西,Runnable类的首个任务对象,执行的线程thread和完成的任务数(volatile)completedTasks。这个类还提供了interruptIfStarted()这样一个方法,里面做了(getState()>=0)的判断。与此呼应,Worker的构造方法里对state设置了-1,避免在线程执行前被停掉。注意:第一个需要执行的任务 当有新的任务需要调度,并且需要创建新的线程时,在构造函数中为其赋值,此时新任务不放入任务缓存队列目的是减少任务缓存队列入队和出队的操作,提高调度性能(任务缓存队列的入队和出队操作,会涉及锁定和并发处理)。

    /** Delegates main run loop to outer runWorker  */
    public void run() {
    runWorker(this);
    }

    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    while (task != null || (task = getTask()) != null) {
    w.lock();
    // If pool is stopping, ensure thread is interrupted;
    // if not, ensure thread is not interrupted. This
    // requires a recheck in second case to deal with
    // shutdownNow race while clearing interrupt
    if ((runStateAtLeast(ctl.get(), STOP) ||
    (Thread.interrupted() &&
    runStateAtLeast(ctl.get(), STOP))) &&
    !wt.isInterrupted())
    wt.interrupt();
    try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
    task.run();
    } catch (RuntimeException x) {
    thrown = x; throw x;
    } catch (Error x) {
    thrown = x; throw x;
    } catch (Throwable x) {
    thrown = x; throw new Error(x);
    } finally {
    afterExecute(task, thrown);
    }
    } finally {
    task = null;
    w.completedTasks++;
    w.unlock();
    }
    }
    completedAbruptly = false;
    } finally {
    processWorkerExit(w, completedAbruptly);
    }
    }
    根据代码顺序看下来,其实很简单。
    • 线程开始执行前,需要对worker加锁,完成一个任务后执行unlock()
    • 在任务执行前后,执行beforeExecute()和afterExecute()方法
    • 记录任务执行中的异常后,继续抛出
    • 每个任务完成后,会记录当前线程完成的任务数
    • 当worker执行完一个任务的时候,包括初始任务firstTask,会调用getTask()继续获取任务,这个方法调用
    是可以阻塞的
    • 线程退出,执行processWorkerExit(w, completedAbruptly)处理
    接下来看一下getTask()是怎样实现空闲线程复用的
    /**
    * Performs blocking or timed wait for a task, depending on
    * current configuration settings, or returns null if this worker
    * must exit because of any of:
    * 1. There are more than maximumPoolSize workers (due to
    * a call to setMaximumPoolSize).
    * 2. The pool is stopped.
    * 3. The pool is shutdown and the queue is empty.
    * 4. This worker timed out waiting for a task, and timed-out
    * workers are subject to termination (that is,
    * {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
    * both before and after the timed wait, and if the queue is
    * non-empty, this worker is not the last thread in the pool.
    *
    * @return task, or null if the worker must exit, in which case
    * workerCount is decremented
    */
    private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    decrementWorkerCount();
    return null;
    }

    int wc = workerCountOf(c);

    // Are workers subject to culling?
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
    return null;
    continue;
    }

    try {
    Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
    if (r != null)
    return r;
    timedOut = true;
    } catch (InterruptedException retry) {
    timedOut = false;
    }
    }
    }

    getTask()实际上是从工作队列(workQueue)中取提交进来的任务。这个workQueue是一个BlockingQueue,通常当队列中没有新任务的时候,则getTask()会阻塞。
    另外,还有定时阻塞这样一段逻辑:如果从队列中取任务是计时的,则用poll()方法,并设置等待时间为keepAlive,
    否则调用阻塞方法take()。当poll()超时,则获取到的任务为null,timeOut设置为 true。这段代码也是放在一个for(;;)循环中,
    前面有判断超时的语句,如果超时,则return null。这意味着runWorker()方法的while循环结束,线程将退出,执行processWorkerExit()方法。
  • 相关阅读:
    nginx配置反向代理
    hyperchain HVM使用java编写智能合约的编译、部署流程
    leetcode 140单词拆分Ⅱ
    bomblab phase5
    bomb lab 二三阶段
    2021暑假算法学习笔记(基础复习)#2
    2021暑假算法学习笔记(基础复习)#1
    O(logn)最长上升子序列并输出
    A Daily Topic # 7 阶乘的和(二进制/枚举)
    A Daily Topic # 6 星期几(模拟)
  • 原文地址:https://www.cnblogs.com/heapStark/p/9398433.html
Copyright © 2011-2022 走看看