zoukankan      html  css  js  c++  java
  • Java

    ThreadPoolExecutor简介

    ThreadPoolExecutor是线程池类。对于线程池,可以通俗的将它理解为"存放一定数量线程的一个线程集合。线程池允许若个线程同时允许,允许同时运行的线程数量就是线程池的容量;当添加的到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。"

    ThreadPoolExecutor数据结构

    ThreadPoolExecutor的数据结构如下图所示:

    各个数据在ThreadPoolExecutor.java中的定义如下:

    复制代码
    // 阻塞队列。
    private final BlockingQueue<Runnable> workQueue;
    // 互斥锁
    private final ReentrantLock mainLock = new ReentrantLock();
    // 线程集合。一个Worker对应一个线程。
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // “终止条件”,与“mainLock”绑定。
    private final Condition termination = mainLock.newCondition();
    // 线程池中线程数量曾经达到过的最大值。
    private int largestPoolSize;
    // 已完成任务数量
    private long completedTaskCount;
    // ThreadFactory对象,用于创建线程。
    private volatile ThreadFactory threadFactory;
    // 拒绝策略的处理句柄。
    private volatile RejectedExecutionHandler handler;
    // 保持线程存活时间。
    private volatile long keepAliveTime;
    
    private volatile boolean allowCoreThreadTimeOut;
    // 核心池大小
    private volatile int corePoolSize;
    // 最大池大小
    private volatile int maximumPoolSize;
    复制代码

    1. workers
        workers是HashSet<Work>类型,即它是一个Worker集合。而一个Worker对应一个线程,也就是说线程池通过workers包含了"一个线程集合"。当Worker对应的线程池启动时,它会执行线程池中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一个阻塞的任务来继续运行。
        wokers的作用是,线程池通过它实现了"允许多个线程同时运行"。

    2. workQueue
        workQueue是BlockingQueue类型,即它是一个阻塞队列。当线程池中的线程数超过它的容量的时候,线程会进入阻塞队列进行阻塞等待。
        通过workQueue,线程池实现了阻塞功能。

    3. mainLock
        mainLock是互斥锁,通过mainLock实现了对线程池的互斥访问。

    4. corePoolSize和maximumPoolSize
        corePoolSize是"核心池大小",maximumPoolSize是"最大池大小"。它们的作用是调整"线程池中实际运行的线程的数量"。
        例如,当新任务提交给线程池时(通过execute方法)。
              -- 如果此时,线程池中运行的线程数量< corePoolSize,则创建新线程来处理请求。
              -- 如果此时,线程池中运行的线程数量> corePoolSize,但是却< maximumPoolSize;则仅当阻塞队列满时才创建新线程。
              如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心池大小和最大池大小的值是在创建线程池设置的;但是,也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

    5. poolSize
        poolSize是当前线程池的实际大小,即线程池中任务的数量。

    6. allowCoreThreadTimeOut和keepAliveTime
        allowCoreThreadTimeOut表示是否允许"线程在空闲状态时,仍然能够存活";而keepAliveTime是当线程池处于空闲状态的时候,超过keepAliveTime时间之后,空闲的线程会被终止。

    7. threadFactory
        threadFactory是ThreadFactory对象。它是一个线程工厂类,"线程池通过ThreadFactory创建线程"。

    8. handler
        handler是RejectedExecutionHandler类型。它是"线程池拒绝策略"的句柄,也就是说"当某任务添加到线程池中,而线程池拒绝该任务时,线程池会通过handler进行相应的处理"。

    综上所说,线程池通过workers来管理"线程集合",每个线程在启动后,会执行线程池中的任务;当一个任务执行完后,它会从线程池的阻塞队列中取出任务来继续运行。阻塞队列是管理线程池任务的队列,当添加到线程池中的任务超过线程池的容量时,该任务就会进入阻塞队列进行等待。

    线程池调度

    我们通过下面的图看看下面线程池中任务的调度策略,加深对线程池的理解。

    图-01:

    图-02:

    说明
        在"图-01"中,线程池中有N个任务。"任务1", "任务2", "任务3"这3个任务在执行,而"任务3"到"任务N"在阻塞队列中等待。正在执行的任务,在workers集合中,workers集合包含3个Worker,每一个Worker对应一个Thread线程,Thread线程每次处理一个任务。
        当workers集合中处理完某一个任务之后,会从阻塞队列中取出一个任务来继续执行,如图-02所示。图-02表示"任务1"处理完毕之后,线程池将"任务4"从阻塞队列中取出,放到workers中进行处理。

    Java多线程系列--“JUC线程池”03之 线程池原理(二)

    线程池示例

    在分析线程池之前,先看一个简单的线程池示例。

    复制代码
     1 import java.util.concurrent.Executors;
     2 import java.util.concurrent.ExecutorService;
     3 
     4 public class ThreadPoolDemo1 {
     5 
     6     public static void main(String[] args) {
     7         // 创建一个可重用固定线程数的线程池
     8         ExecutorService pool = Executors.newFixedThreadPool(2);
     9         // 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
    10         Thread ta = new MyThread();
    11         Thread tb = new MyThread();
    12         Thread tc = new MyThread();
    13         Thread td = new MyThread();
    14         Thread te = new MyThread();
    15         // 将线程放入池中进行执行
    16         pool.execute(ta);
    17         pool.execute(tb);
    18         pool.execute(tc);
    19         pool.execute(td);
    20         pool.execute(te);
    21         // 关闭线程池
    22         pool.shutdown();
    23     }
    24 }
    25 
    26 class MyThread extends Thread {
    27 
    28     @Override
    29     public void run() {
    30         System.out.println(Thread.currentThread().getName()+ " is running.");
    31     }
    32 }
    复制代码

    运行结果

    pool-1-thread-1 is running.
    pool-1-thread-2 is running.
    pool-1-thread-1 is running.
    pool-1-thread-2 is running.
    pool-1-thread-1 is running.

    示例中,包括了线程池的创建,将任务添加到线程池中,关闭线程池这3个主要的步骤。稍后,我们会从这3个方面来分析ThreadPoolExecutor。

    线程池源码分析

    (一) 创建“线程池”

    下面以newFixedThreadPool()介绍线程池的创建过程。

    1. newFixedThreadPool()

    newFixedThreadPool()在Executors.java中定义,源码如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    说明:newFixedThreadPool(int nThreads)的作用是创建一个线程池,线程池的容量是nThreads。
             newFixedThreadPool()在调用ThreadPoolExecutor()时,会传递一个LinkedBlockingQueue()对象,而LinkedBlockingQueue是单向链表实现的阻塞队列。在线程池中,就是通过该阻塞队列来实现"当线程池中任务数量超过允许的任务数量时,部分任务会阻塞等待"。
    关于LinkedBlockingQueue的实现细节,读者可以参考"Java多线程系列--“JUC集合”08之 LinkedBlockingQueue"。

    2. ThreadPoolExecutor()

    ThreadPoolExecutor()在ThreadPoolExecutor.java中定义,源码如下:

    复制代码
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    复制代码

    说明:该函数实际上是调用ThreadPoolExecutor的另外一个构造函数。该函数的源码如下:

    复制代码
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        // 核心池大小
        this.corePoolSize = corePoolSize;
        // 最大池大小
        this.maximumPoolSize = maximumPoolSize;
        // 线程池的等待队列
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        // 线程工厂对象
        this.threadFactory = threadFactory;
        // 拒绝策略的句柄
        this.handler = handler;
    }
    复制代码

    说明:在ThreadPoolExecutor()的构造函数中,进行的是初始化工作。
    corePoolSize, maximumPoolSize, unit, keepAliveTime和workQueue这些变量的值是已知的,它们都是通过newFixedThreadPool()传递而来。下面看看threadFactory和handler对象。

    2.1 ThreadFactory

    线程池中的ThreadFactory是一个线程工厂,线程池创建线程都是通过线程工厂对象(threadFactory)来完成的。
    上面所说的threadFactory对象,是通过 Executors.defaultThreadFactory()返回的。Executors.java中的defaultThreadFactory()源码如下:

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

    defaultThreadFactory()返回DefaultThreadFactory对象。Executors.java中的DefaultThreadFactory()源码如下:

    复制代码
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        // 提供创建线程的API。
        public Thread newThread(Runnable r) {
            // 线程对应的任务是Runnable对象r
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            // 设为“非守护线程”
            if (t.isDaemon())
                t.setDaemon(false);
            // 将优先级设为“Thread.NORM_PRIORITY”
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    复制代码

    说明:ThreadFactory的作用就是提供创建线程的功能的线程工厂。
             它是通过newThread()提供创建线程功能的,下面简单说说newThread()。newThread()创建的线程对应的任务是Runnable对象,它创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。

    2.2 RejectedExecutionHandler

    handler是ThreadPoolExecutor中拒绝策略的处理句柄。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。
    线程池默认会采用的是defaultHandler策略,即AbortPolicy策略。在AbortPolicy策略中,线程池拒绝任务时会抛出异常!
    defaultHandler的定义如下:

    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

    AbortPolicy的源码如下:

    复制代码
    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
    
        // 抛出异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    复制代码

    (二) 添加任务到“线程池”

    1. execute()

    execute()定义在ThreadPoolExecutor.java中,源码如下:

    复制代码
    public void execute(Runnable command) {
        // 如果任务为null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
        int c = ctl.get();
        // 当线程池中的任务数量 < "核心池大小"时,即线程池中少于corePoolSize个任务。
        // 则通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 当线程池中的任务数量 >= "核心池大小"时,
        // 而且,"线程池处于允许状态"时,则尝试将任务添加到阻塞队列中。
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次确认“线程池状态”,若线程池异常终止了,则删除任务;然后通过reject()执行相应的拒绝策略的内容。
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 否则,如果"线程池中任务数量"为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        // 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }
    复制代码

    说明:execute()的作用是将任务添加到线程池中执行。它会分为3种情况进行处理:
            情况1 -- 如果"线程池中任务数量" < "核心池大小"时,即线程池中少于corePoolSize个任务;此时就新建一个线程,并将该任务添加到线程中进行执行。
            情况2 -- 如果"线程池中任务数量" >= "核心池大小",并且"线程池是允许状态";此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认"线程池的状态",如果"第2次读到的线程池状态"和"第1次读到的线程池状态"不同,则从阻塞队列中删除该任务。
            情况3 -- 非以上两种情况。在这种情况下,尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过reject()拒绝该任务。

    2. addWorker()

    addWorker()的源码如下:

    复制代码
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 更新"线程池状态和计数"标记,即更新ctl。
        for (;;) {
            // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
            int c = ctl.get();
            // 获取线程池状态。
            int rs = runStateOf(c);
    
            // 有效性检查
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 获取线程池中任务的数量。
                int wc = workerCountOf(c);
                // 如果"线程池中任务的数量"超过限制,则返回false。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 通过CAS函数将c的值+1。操作失败的话,则退出循环。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 检查"线程池状态",如果与之前的状态不同,则从retry重新开始。
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        // 添加任务到线程池,并启动任务所在的线程。
        try {
            final ReentrantLock mainLock = this.mainLock;
            // 新建Worker,并且指定firstTask为Worker的第一个任务。
            w = new Worker(firstTask);
            // 获取Worker对应的线程。
            final Thread t = w.thread;
            if (t != null) {
                // 获取锁
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);
    
                    // 再次确认"线程池状态"
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将Worker对象(w)添加到"线程池的Worker集合(workers)"中
                        workers.add(w);
                        // 更新largestPoolSize
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 如果"成功将任务添加到线程池"中,则启动任务所在的线程。 
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 返回任务是否启动。
        return workerStarted;
    }
    复制代码

    说明
        addWorker(Runnable firstTask, boolean core) 的作用是将任务(firstTask)添加到线程池中,并启动该任务。
        core为true的话,则以corePoolSize为界限,若"线程池中已有任务数量>=corePoolSize",则返回false;core为false的话,则以maximumPoolSize为界限,若"线程池中已有任务数量>=maximumPoolSize",则返回false。
        addWorker()会先通过for循环不断尝试更新ctl状态,ctl记录了"线程池中任务数量和线程池状态"。
        更新成功之后,再通过try模块来将任务添加到线程池中,并启动任务所在的线程。

        从addWorker()中,我们能清晰的发现:线程池在添加任务时,会创建任务对应的Worker对象;而一个Workder对象包含一个Thread对象。(01) 通过将Worker对象添加到"线程的workers集合"中,从而实现将任务添加到线程池中。 (02) 通过启动Worker对应的Thread线程,则执行该任务。

    3. submit()

    补充说明一点,submit()实际上也是通过调用execute()实现的,源码如下:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    (三) 关闭“线程池”

    shutdown()的源码如下:

    复制代码
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        // 获取锁
        mainLock.lock();
        try {
            // 检查终止线程池的“线程”是否有权限。
            checkShutdownAccess();
            // 设置线程池的状态为关闭状态。
            advanceRunState(SHUTDOWN);
            // 中断线程池中空闲的线程。
            interruptIdleWorkers();
            // 钩子函数,在ThreadPoolExecutor中没有任何动作。
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            // 释放锁
            mainLock.unlock();
        }
        // 尝试终止线程池
        tryTerminate();
    }
    复制代码

    说明:shutdown()的作用是关闭线程池。

  • 相关阅读:
    webpack4.x相关笔记整理
    vue原理探索--响应式系统
    git常用命令备忘
    小程序开发知识点及坑点总结
    无异常日志,就不能排查问题了???
    【从单体架构到分布式架构】(三)请求增多,单点变集群(2):Nginx
    【从单体架构到分布式架构】(二)请求增多,单点变集群(1):负载均衡
    【从单体架构到分布式架构】(一)万丈高楼平地起:环境准备
    一个接口查询关联了十几张表,响应速度太慢?那就提前把它们整合到一起
    docker基础:容器操作命令
  • 原文地址:https://www.cnblogs.com/qlky/p/7390318.html
Copyright © 2011-2022 走看看