zoukankan      html  css  js  c++  java
  • Java 1.7 ThreadPoolExecutor源码解析

    Java中使用线程池技术一般都是使用Executors这个工厂类,它提供了非常简单方法来创建各种类型的线程池:

    public static ExecutorService newFixedThreadPool(int nThreads)
    public static ExecutorService newSingleThreadExecutor() 
    public static ExecutorService newCachedThreadPool() 
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    thread-pool-executor-overview.png

    核心的接口其实是Executor,它只有一个execute方法抽象为对任务(Runnable接口)的执行, ExecutorService接口在Executor的基础上提供了对任务执行的生命周期的管理,主要是submitshutdown方法, AbstractExecutorServiceExecutorService一些方法做了默认的实现,主要是submit和invoke方法,而真正的任务执行 的Executor接口execute方法是由子类实现,就是ThreadPoolExecutor,它实现了基于线程池的任务执行框架,所以要了解 JDK的线程池,那么就得先看这个类。

    再看execute方法之前需要先介几个变量或类。

    ctl

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    这个变量是整个类的核心,AtomicInteger保证了对这个变量的操作是原子的,通过巧妙的操作,ThreadPoolExecutor用这一个变量保存了两个内容:

    • 所有有效线程的数量
    • 各个线程的状态(runState)

    低29位存线程数,高3位存runState,这样runState有5个值:

    • RUNNING:-536870912
    • SHUTDOWN:0
    • STOP:536870912
    • TIDYING:1073741824
    • TERMINATED:1610612736

    线程池中各个状态间的转换比较复杂,主要记住下面内容就可以了:

    • RUNNING状态:线程池正常运行,可以接受新的任务并处理队列中的任务;
    • SHUTDOWN状态:不再接受新的任务,但是会执行队列中的任务;
    • STOP状态:不再接受新任务,不处理队列中的任务

    围绕ctl变量有一些操作,了解这些方法是看懂后面一些晦涩代码的基础:

    View Code

     

    corePoolSize

    核心线程池大小,活动线程小于corePoolSize则直接创建,大于等于则先加到workQueue中,队列满了才创建新的线程。

     

    keepAliveTime

    线程从队列中获取任务的超时时间,也就是说如果线程空闲超过这个时间就会终止。

     

    Worker

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable ...

    内部类Worker是对任务的封装,所有submit的Runnable都被封装成了Worker,它本身也是一个Runnable, 然后利用AQS框架(关于AQS可以看我这篇文章)实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行,可以看后面shutdownshutdownNow方法的分析。

    复制代码
    // state只有0和1,互斥
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;// 成功获得锁
        }
        // 线程进入等待队列
        return false;
    }
    
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    复制代码

    之所以不用ReentrantLock是为了避免任务执行的代码中修改线程池的变量,如setCorePoolSize,因为ReentrantLock是可重入的。

     

    execute

    execute方法主要三个步骤:

    • 活动线程小于corePoolSize的时候创建新的线程;
    • 活动线程大于corePoolSize时都是先加入到任务队列当中;
    • 任务队列满了再去启动新的线程,如果线程数达到最大值就拒绝任务。
    复制代码
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
    
        int c = ctl.get();
        // 活动线程数 < corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            // 直接启动新的线程。第二个参数true:addWorker中会重新检查workerCount是否小于corePoolSize
            if (addWorker(command, true))
                // 添加成功返回
                return;
            c = ctl.get();
        }
        // 活动线程数 >= corePoolSize
        // runState为RUNNING && 队列未满
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // double check
            // 非RUNNING状态 则从workQueue中移除任务并拒绝
            if (!isRunning(recheck) && remove(command))
                reject(command);// 采用线程池指定的策略拒绝任务
            // 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败
            else if (workerCountOf(recheck) == 0)
                // 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
                // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
                addWorker(null, false);
    
            // 两种情况:
            // 1.非RUNNING状态拒绝新的任务
            // 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
        } else if (!addWorker(command, false))
            reject(command);
    }
    复制代码

    注释比较清楚了就不再解释了,其中比较难理解的应该是addWorker(null, false);这一行,这要结合addWorker一起来看。 主要目的是防止HUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。

     

    addWorker

    这个方法理解起来比较费劲。

    View Code

     

    runWorker

    任务添加成功后实际执行的是runWorker这个方法,这个方法非常重要,简单来说它做的就是:

    • 第一次启动会执行初始化传进来的任务firstTask;
    • 然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。
    View Code

     

    getTask

    View Code

     

    processWorkerExit

    线程退出会执行这个方法做一些清理工作。

    View Code

     

    tryTerminate

    processWorkerExit方法中会尝试调用tryTerminate来终止线程池。这个方法在任何可能导致线程池终止的动作后执行:比如减少wokerCount或SHUTDOWN状态下从队列中移除任务。

    View Code

     

    shutdown和shutdownNow

    shutdown这个方法会将runState置为SHUTDOWN,会终止所有空闲的线程。

    复制代码
    public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
                advanceRunState(SHUTDOWN);
                // 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
                // tryTerminate方法中会保证队列中剩余的任务得到执行。
                interruptIdleWorkers();
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
        }
    复制代码

    shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止所有的线程。

    复制代码
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // STOP状态:不再接受新任务且不再执行队列中的任务。
            advanceRunState(STOP);
            // 中断所有线程
            interruptWorkers();
            // 返回队列中还没有被执行的任务。
            tasks = drainQueue();
        }
        finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    复制代码

    主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法:

    复制代码
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock,
                // 因此保证了中断的肯定是空闲的线程。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        }
        finally {
            mainLock.unlock();
        }
    }
    复制代码
    复制代码
    void interruptIfStarted() {
        Thread t;
        // 初始化时state == -1
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
    复制代码

    这就是前面提到的Woker类实现AQS的主要作用。

    注意:shutdown方法可能会在finalize被隐式的调用。

    这篇博客基本都是代码跟注释,所以如果不是分析ThreadPoolExecutor源码的话看起来会非常无聊。

    总结:

    • int corePoolSize:核心线程数

    • int maximumPoolSize:最大线程数

    • BlockingQueue workQueue:任务队列

    • long keepAliveTime:和TimeUnit unit一起构成线程的最大空闲时间,一旦超过该时间还没有任务处理,该线程就走向结束了。它是针对当前线程数已经超过corePoolSize核心线程数了或者核心线程数也开启超时策略,即属性allowCoreThreadTimeOut=true

    • ThreadFactory threadFactory:线程工厂

    • RejectedExecutionHandler handler:拒绝策略,当任务太多来不及处理,可拒绝该任务

    先简单描述下ThreadPoolExecutor的execute(futureTask)过程的大概情况:

    1 如果当前线程数小于corePoolSize,则直接创建出一个线程,用于执行新加进来的任务

    2 如果当前线程数已经超过corePoolSize,则将该任务放到BlockingQueue workQueue任务队列中,该任务队列可以是有限容量也可以是无限容量的。每个线程处理完一个任务后,都会不断的从BlockingQueue workQueue任务队列中取出任务并执行

    3 如果BlockingQueue workQueue是有限容量的,已满无法放进新的任务了,如果此时的线程数小于maximumPoolSize,则直接创建一个线程执行该任务

    4 如果线程数已达到maximumPoolSize不能再创建线程了,则直接使用RejectedExecutionHandler handler拒绝该任务

    原文转载:http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-ThreadPoolExecutor.html

  • 相关阅读:
    Bootstrap3基础 thumbnail 圆角类型的div块
    Bootstrap3基础 text-muted/success... 辅助类样式 情景文本颜色
    Bootstrap3基础 text-right/left/center 设置标题右对齐、左对齐、居中
    Bootstrap3基础 table-striped 表格实现隔行换色(浅灰色与白色交替)
    Bootstrap3基础 table-condensed 表格中的单元格紧凑一些
    Bootstrap3基础 table-responsive 响应式表格
    Bootstrap3基础 table-bordered/hover 表格加外边框和鼠标悬停对应行的背景色加深
    Bootstrap3基础 page-header 标题下加分割线
    iOS Swift编程语言
    【强烈推荐】XCODE的插件之王
  • 原文地址:https://www.cnblogs.com/AndyAo/p/8135063.html
Copyright © 2011-2022 走看看