zoukankan      html  css  js  c++  java
  • 第十三章 ThreadPoolExecutor源码解析

    ThreadPoolExecutor使用方式、工作机理以及参数的详细介绍,请参照《第十二章 ThreadPoolExecutor使用与工作机理 》

    1、源代码主要掌握两个部分

    • 线程池的创建:构造器
    • 提交任务到线程池去执行:execute()

    2、构造器

    2.1、一些属性:

        /**
         * runState provides the main lifecyle control, taking on values:
         *
         * RUNNING -> SHUTDOWN
         *    On invocation of shutdown(), perhaps implicitly in finalize()
         * (RUNNING or SHUTDOWN) -> STOP
         *    On invocation of shutdownNow()
         * SHUTDOWN -> TERMINATED
         *    When both queue and pool are empty
         * STOP -> TERMINATED
         *    When pool is empty
         */
        volatile int runState;
        static final int RUNNING    = 0;//接收新的任务,会处理队列中的任务
        static final int SHUTDOWN   = 1;//不接收新的任务,但是会处理队列中的任务
        static final int STOP       = 2;//不接收新的任务,也不会处理队列中的任务,而且还会中断正在执行的任务
        static final int TERMINATED = 3;//STOP+中止所有线程
    
        private final BlockingQueue<Runnable> workQueue;//队列
    
        /**
         * 对poolSize, corePoolSize, maximumPoolSize, runState, and workers set上锁
         */
        private final ReentrantLock mainLock = new ReentrantLock();
    
        /**
         * 支持awaitTermination的等待条件
         */
        private final Condition termination = mainLock.newCondition();
    
        /**
         * pool中的所有工作线程集合;仅仅在持有mainLock的时候才允许被访问
         */
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        private volatile long  keepAliveTime;
    
        /**
         * false(默认):当核心线程处于闲置状态时,也会存活
         * true:核心线程使用keepAliveTime来决定自己的存活状态
         */
        private volatile boolean allowCoreThreadTimeOut;
    
        /**
         * Core pool size,仅仅在持有mainLock的时候才允许被更新, 
         * 因为是volatile允许并发读(即使是在更新的过程中)
         */
        private volatile int   corePoolSize;
    
        /**
         * Maximum pool size, 其他同上
         */
        private volatile int   maximumPoolSize;
    
        /**
         * Current pool size, 其他同上
         */
        private volatile int   poolSize;
    
        /**
         * 回绝处理器
         */
        private volatile RejectedExecutionHandler handler;
    
        /**
         * 所有的线程都通过这个线程工厂的addThread方法来创建。
         */
        private volatile ThreadFactory threadFactory;
    
        /**
         * Tracks largest attained pool size.
         */
        private int largestPoolSize;
    
        /**
         * 已经完成的任务数.仅仅在工作线程被终结的时候这个数字才会被更新 
         */
        private long completedTaskCount;
    
        /**
         * 默认的回绝处理器(回绝任务并抛出异常)
         */
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    View Code

    说明:因为属性不多,这里列出了全部属性。

    2.2、构造器:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            /*
             * 检查参数
             */
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            /*
             * 初始化值
             */
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);//转成纳秒
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    View Code

    说明:4个构造器(1个5参+2个6参+1个7参)

    注意:默认情况下,构造器只会初始化参数,不会提前构建好线程

    建议:构造器参数众多,建议使用构建器模式,关于构建器模式的实际使用范例,请参照《第二章 Google guava cache源码解析1--构建缓存器

    构造器中默认线程工厂的创建:Executors中的方法

        public static ThreadFactory defaultThreadFactory() {
            return new DefaultThreadFactory();
        }
    
        /**
         * 默认的线程工厂
         */
        static class DefaultThreadFactory implements ThreadFactory {
            static final AtomicInteger poolNumber = new AtomicInteger(1);//池数量
            final ThreadGroup group;//线程组
            final AtomicInteger threadNumber = new AtomicInteger(1);//线程数量
            final String namePrefix;
    
            /*
             * 创建默认的线程工厂
             */
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null)? s.getThreadGroup() :
                                     Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            /*
             * 创建一个新的线程
             */
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),//新线程的名字
                                      0);
                /*
                 * 将后台线程设置为应用线程
                 */
                if (t.isDaemon())
                    t.setDaemon(false);
                /*
                 * 将线程的优先级全部设置为NORM_PRIORITY
                 */
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    View Code

    说明,其中的newThread()方法会在第三部分用到。

    3、提交任务的线程池去执行execute(Runnable command)

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /**
             * 这一块儿就是整个工作机理的部分(代码比较精致)
             * 1、addIfUnderCorePoolSize
             * 1)如果当前线程数poolSize<核心线程数corePoolSize并且pool的状态为RUNNING,
             * 1.1)先获取锁
             * 1.2)根据传入的任务firstTask创建一个Work对象,在该对象中编写了run()方法,在该run()方法中会真正的去执行firstTask的run()
             * 说明:关于Work对象run部分的内容,查看Work内部类的run()方法上边的注释以及与其相关方法的注释
             * 1.3)通过线程工厂与上边创建出来的work对象w创建新的线程t,将w加入工作线程集合,
             * 然后启动线程t,之后就会自动执行w中的run(),w中的run()又会调用firstTask的run(),即处理真正的业务逻辑
             * 
             * 2、如果poolSize>=corePoolSize或者上边的执行失败了
             * 1)如果pool的状态处于RUNNING,将该任务入队(offer(command))
             * 如果入队后,pool的状态不是RUNNING了或者池中的线程数为0了,下边的逻辑具体去查看注释
             * 2)addIfUnderMaximumPoolSize(同addIfUnderCorePoolSize)
             * 如果增加线程也不成功,则回绝任务。
             * 
             */
            if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                if (runState == RUNNING && workQueue.offer(command)) {
                    if (runState != RUNNING || poolSize == 0)
                        ensureQueuedTaskHandled(command);
                }
                else if (!addIfUnderMaximumPoolSize(command))
                    reject(command); // is shutdown or saturated
            }
        }
    View Code

    3.1、addIfUnderCorePoolSize(Runnable firstTask)

        /**
         * 创建并且启动一个新的线程来处理任务
         * 1、其第一个任务就是传入的firstTask参数
         * 2、该方法仅仅用于当前线程数小于核心线程数并且pool没有被关掉的时候
         */
        private boolean addIfUnderCorePoolSize(Runnable firstTask) {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();//获取锁
            try {
                if (poolSize < corePoolSize && runState == RUNNING)
                    t = addThread(firstTask);//创建新线程
            } finally {
                mainLock.unlock();//释放锁
            }
            return t != null;
        }
    View Code

    addThread(Runnable firstTask)

        private Thread addThread(Runnable firstTask) {
            Worker w = new Worker(firstTask);//构造一个work
            Thread t = threadFactory.newThread(w);//创建线程
            boolean workerStarted = false;
            if (t != null) {//
                if (t.isAlive()) //如果t线程已经启动了,而且还没有死亡
                    throw new IllegalThreadStateException();
                w.thread = t;
                workers.add(w);//将w工作线程加入workers线程池
                int nt = ++poolSize;//当前的池数量+1
                if (nt > largestPoolSize)
                    largestPoolSize = nt;
                try {
                    t.start();//启动线程
                    workerStarted = true;
                }
                finally {
                    if (!workerStarted)//启动线程没有成功
                        workers.remove(w);//将w从workers集合中删除
                }
            }
            return t;
        }
    View Code

    newThread(Runnable r)

    该方法在构建上边的默认线程工厂部分已经说过了。

    Work内部类:

    /**
         * 工作线程。
         */
        private final class Worker implements Runnable {
            /**
             * 在每一个任务的执行前后都会获取和释放runLock。
             * 该锁只要是为了防止中断正在执行任务的work线程
             */
            private final ReentrantLock runLock = new ReentrantLock();
    
            /**
             * Initial task to run before entering run loop. 
             * 1、Possibly null.
             */
            private Runnable firstTask;
    
            /**
             * 每个work线程完成的任务总量
             * accumulated into completedTaskCount upon termination.
             */
            volatile long completedTasks;
    
            Thread thread;
    
            /**
             * 该work中的线程是不是确实正在执行了run()
             */
            volatile boolean hasRun = false;
    
            Worker(Runnable firstTask) {
                this.firstTask = firstTask;
            }
    
            /*
             * true:已经有线程持有了该锁
             */
            boolean isActive() {
                return runLock.isLocked();
            }
    
            private void runTask(Runnable task) {
                final ReentrantLock runLock = this.runLock;
                runLock.lock();//获取锁runLock
                try {
                    /*
                     * 如果pool状态为STOP或TERMINATED,确保线程被打断;
                     * 如果不是,确保线程不要被打断
                     */
                    if ((runState >= STOP ||
                        (Thread.interrupted() && runState >= STOP)) &&
                        hasRun)
                        thread.interrupt();
                    /*
                     * 确保afterExecute会被执行仅仅当任务完成了(try)或抛出了异常(catch)
                     */
                    boolean ran = false;
                    beforeExecute(thread, task);//执行任务的run()方法之前要执行的操作
                    try {
                        task.run();//执行线程的run()方法
                        ran = true;
                        afterExecute(task, null);//执行任务的run()方法之后要执行的操作
                        ++completedTasks;
                    } catch (RuntimeException ex) {
                        if (!ran)
                            afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    runLock.unlock();//释放锁runLock
                }
            }
    
            /**
             * Main run loop
             * 运行当前任务task,运行结束后,尝试获取队列中的其他任务,
             * 如果最后通过各种方式都获取不到,就回收该线程,如果获取到了,就用该线程继续执行接下来的任务
             * 最后,当获取不到任何任务去执行时,就将该线程从works线程集合中删除掉
             */
            public void run() {
                try {
                    hasRun = true;
                    Runnable task = firstTask;
                    firstTask = null;
                    while (task != null || (task = getTask()) != null) {
                        runTask(task);//运行该任务
                        task = null;
                    }
                } finally {
                    workerDone(this);//将该线程从works集合中删除
                }
            }
        }
    View Code

    说明:这里列出了该内部类的全部属性和常用方法。

    getTask()

        /**
         * 获取下一个worker线程将要运行的任务
         * Gets the next task for a worker thread to run.  
         */
        Runnable getTask() {
            for (;;) {//无限循环
                try {
                    int state = runState;
                    if (state > SHUTDOWN)
                        return null;
                    Runnable r;
                    if (state == SHUTDOWN)  // Help drain queue
                        r = workQueue.poll();//处理queue中的任务
                    //下面的runState==RUNNING
                    else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                        //从队头获取任务,如果没有任务,等待keepAliveTime的时间
                        r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                    else
                        //从队头获取任务,如果没有任务,阻塞等待
                        r = workQueue.take();
                    if (r != null)
                        return r;
                    if (workerCanExit()) {//允许回收获取任务失败的线程
                        if (runState >= SHUTDOWN) // Wake up others
                            interruptIdleWorkers();//中断闲置的work线程
                        return null;
                    }
                    // Else retry
                } catch (InterruptedException ie) {
                    // On interruption, re-check runState
                }
            }
        }
    View Code

    workerCanExit()

        /**
         * 检测一个获取任务失败的work线程是否可以退出了。
         * 出现下面三种情况,work线程就会死亡。
         * 1、如果pool的状态为STOP或TERMINATED
         * 2、队列为空
         * 3、允许回收核心线程并且池中的线程数大于1和corePoolSize的最大值
         */
        private boolean workerCanExit() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            boolean canExit;
            try {
                canExit = runState >= STOP ||
                    workQueue.isEmpty() ||
                    (allowCoreThreadTimeOut &&
                     poolSize > Math.max(1, corePoolSize));
            } finally {
                mainLock.unlock();
            }
            return canExit;
        }
    View Code

    workerDone(Worker w)

        void workerDone(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                workers.remove(w);//从workers集合中删除该线程
                if (--poolSize == 0)//如果池中的线程数为0
                    tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    View Code

    3.2、ensureQueuedTaskHandled(Runnable command)

        /**
         * 在一个task入队之后重新检查state。
         * 当一个task入队后,pool的state发生了变化,该方法就会被调用。
         * 如果一个task入队的同时,shutdownNow方法发生了调用,该方法就必须从队列中移除并回绝
         * 否则该方法会保证至少有一个线程来处理入队的task
         */
        private void ensureQueuedTaskHandled(Runnable command) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            boolean reject = false;
            Thread t = null;
            try {
                int state = runState;
                if (state != RUNNING && workQueue.remove(command))
                    reject = true;
                else if (state < STOP &&
                         poolSize < Math.max(corePoolSize, 1) &&
                         !workQueue.isEmpty())
                    t = addThread(null);
            } finally {
                mainLock.unlock();
            }
            if (reject)
                reject(command);
        }
    View Code

    3.3、addIfUnderMaximumPoolSize(Runnable firstTask)

        private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (poolSize < maximumPoolSize && runState == RUNNING)
                    t = addThread(firstTask);
            } finally {
                mainLock.unlock();
            }
            return t != null;
        }
    View Code

    说明:该方法的其他方法与addIfUnderCorePoolSize(Runnable firstTask)一样。

    3.4、reject(Runnable command)

        void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
    View Code
        public static class AbortPolicy implements RejectedExecutionHandler {
            
            public AbortPolicy() { }
            /** 直接抛异常 */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException();
            }
        }
    View Code

    说明:明白了上一章将的线程池机理,按着这个机理去看源代码是非常容易的事情。

    总结:

    • 上一章的工作机理
    • 上一章的参数详细说明
  • 相关阅读:
    Python range() 函数用法
    substr
    disable enable 所有其他表关联的外键
    sql with multiply where
    小米开源文件管理器MiCodeFileExplorer-源码研究(7)-Favorite收藏管理和SQLite数据库CRUD
    Centos安装FastDFS+Nginx(一天时间搞定)
    Centos安装FastDFS+Nginx(一天时间搞定)
    小米开源文件管理器MiCodeFileExplorer-源码研究(6)-媒体文件MediaFile和文件类型MimeUtils
    小米开源文件管理器MiCodeFileExplorer-源码研究(6)-媒体文件MediaFile和文件类型MimeUtils
    小米开源文件管理器MiCodeFileExplorer-源码研究(5)-AsyncTask异步任务
  • 原文地址:https://www.cnblogs.com/java-zhao/p/5147811.html
Copyright © 2011-2022 走看看