zoukankan      html  css  js  c++  java
  • Java 线程池(ThreadPoolExecutor)原理解析

    在我们的开发中“池”的概念并不罕见,有数据库连接池、线程池、对象池、常量池等等。下面我们主要针对线程池来一步一步揭开线程池的面纱。

    有关java线程技术文章还可以推荐阅读:《关于java多线程wait 和sleep方法》、《java 核心编程——线程之线程的基本概念》、《上海尚学堂:40个Java多线程问题总结》、《java多线程的内存模型

    一、使用线程池的好处

    1、降低资源消耗

    可以重复利用已创建的线程降低线程创建和销毁造成的消耗。

    2、提高响应速度

    当任务到达时,任务可以不需要等到线程创建就能立即执行。

    3、提高线程的可管理性

    线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控

    二、线程池的工作原理

    首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的

    1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。

    2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步

    3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务

    三、线程池饱和策略

    这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:

    AbortPolicy

    为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。

    DiscardPolicy

    直接抛弃,任务不执行,空方法

    DiscardOldestPolicy

    从队列里面抛弃head的一个任务,并再次execute 此task。

    CallerRunsPolicy

    在调用execute的线程里面执行此command,会阻塞入口

    用户自定义拒绝策略(最常用)

    实现RejectedExecutionHandler,并自己定义策略模式

    下我们以ThreadPoolExecutor为例展示下线程池的工作流程图


    1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

    2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

    3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

    4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

    ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

    关键方法源码分析

    我们看看核心方法添加到线程池方法execute的源码如下:

         //
         //Executes the given task sometime in the future.  The task
         //may execute in a new thread or in an existing pooled thread.
         //
         // If the task cannot be submitted for execution, either because this
         // executor has been shutdown or because its capacity has been reached,
         // the task is handled by the current {@code RejectedExecutionHandler}.
         //
         // @param command the task to execute
         // @throws RejectedExecutionException at discretion of
         //         {@code RejectedExecutionHandler}, if the task
         //         cannot be accepted for execution
         // @throws NullPointerException if {@code command} is null
         //
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            //
             // Proceed in 3 steps:
             //
             // 1. If fewer than corePoolSize threads are running, try to
             // start a new thread with the given command as its first
             // task.  The call to addWorker atomically checks runState and
             // workerCount, and so prevents false alarms that would add
             // threads when it shouldn't, by returning false.
             // 翻译如下:
             // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,
             // 如果能完成新线程创建exexute方法结束,成功提交任务
             // 2. If a task can be successfully queued, then we still need
             // to double-check whether we should have added a thread
             // (because existing ones died since last checking) or that
             // the pool shut down since entry into this method. So we
             // recheck state and if necessary roll back the enqueuing if
             // stopped, or start a new thread if there are none.
             // 翻译如下:
             // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态
             // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要
             // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
             // 3. If we cannot queue task, then we try to add a new
             // thread.  If it fails, we know we are shut down or saturated
             // and so reject the task.
             // 翻译如下:
             // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池
             // 已经达到饱和状态,所以reject这个他任务
             //
            int c = ctl.get();
            // 工作线程数小于核心线程数
            if (workerCountOf(c) < corePoolSize) {
                // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 如果工作线程数大于等于核心线程数
            // 线程的的状态未RUNNING并且队列notfull
            if (isRunning(c) && workQueue.offer(command)) {
                // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    // 移除成功,拒绝该非运行的任务
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
                    // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
                    addWorker(null, false);
            }
            // 如果队列满了或者是非运行的任务都拒绝执行
            else if (!addWorker(command, false))
                reject(command);
        }

    下面我们继续看看addWorker是如何实现的:

      private boolean addWorker(Runnable firstTask, boolean core) {
            // java标签
            retry:
            // 死循环
            for (;;) {
                int c = ctl.get();
                // 获取当前线程状态
                int rs = runStateOf(c);
                // Check if queue empty only if necessary.
                // 这个逻辑判断有点绕可以改成 
                // rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
                // 逻辑判断成立可以分为以下几种情况均不接受新任务
                // 1、rs > shutdown:--不接受新任务
                // 2、rs >= shutdown && firstTask != null:--不接受新任务
                // 3、rs >= shutdown && workQueue.isEmppty:--不接受新任务
                // 逻辑判断不成立
                // 1、rs==shutdown&&firstTask != null:此时不接受新任务,但是仍会执行队列中的任务
                // 2、rs==shotdown&&firstTask == null:会执行addWork(null,false)
                //  防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
                //  添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
                if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
                    return false;
                // 死循环
                // 如果线程池状态为RUNNING并且队列中还有需要执行的任务
                for (;;) {
                    // 获取线程池中线程数量
                    int wc = workerCountOf(c);
                    // 如果超出容量或者最大线程池容量不在接受新任务
                    if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 线程安全增加工作线程数
                    if (compareAndIncrementWorkerCount(c))
                        // 跳出retry
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    // 如果线程池状态发生变化,重新循环
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            // 走到这里说明工作线程数增加成功
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                final ReentrantLock mainLock = this.mainLock;
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    // 加锁
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
                        int rs = runStateOf(c);
                        // RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 检查线程状态
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // 将新启动的线程添加到线程池中
                            workers.add(w);
                            // 更新线程池线程数且不超过最大值
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    // 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
                    if (workerAdded) {
                        //执行ThreadPoolExecutor的runWoker方法
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                // 线程启动失败,则从wokers中移除w并递减wokerCount
                if (! workerStarted)
                    // 递减wokerCount会触发tryTerminate方法
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

    addWorker之后是runWorker,第一次启动会执行初始化传进来的任务firstTask;然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间

     final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            // 允许中断
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                // 如果getTask返回null那么getTask中会将workerCount递减,如果异常了这个递减操作会在processWorkerExit中处理
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }

    我们看下getTask是如何执行的

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
            // 死循环
            retry: for (;;) {
                // 获取线程池状态
                int c = ctl.get();
                int rs = runStateOf(c);
                // Check if queue empty only if necessary.
                // 1.rs > SHUTDOWN 所以rs至少等于STOP,这时不再处理队列中的任务
                // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,这时还需要处理队列中的任务除非队列为空
                // 这两种情况都会返回null让runWoker退出while循环也就是当前线程结束了,所以必须要decrement
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    // 递减workerCount值
                    decrementWorkerCount();
                    return null;
                }
                // 标记从队列中取任务时是否设置超时时间
                boolean timed; // Are workers subject to culling?
                // 1.RUNING状态
                // 2.SHUTDOWN状态,但队列中还有任务需要执行
                for (;;) {
                    int wc = workerCountOf(c);
                    // 1.core thread允许被超时,那么超过corePoolSize的的线程必定有超时
                    // 2.allowCoreThreadTimeOut == false && wc >
                    // corePoolSize时,一般都是这种情况,core thread即使空闲也不会被回收,只要超过的线程才会
                    timed = allowCoreThreadTimeOut || wc > corePoolSize;
                    // 从addWorker可以看到一般wc不会大于maximumPoolSize,所以更关心后面半句的情形:
                    // 1. timedOut == false 第一次执行循环, 从队列中取出任务不为null方法返回 或者
                    // poll出异常了重试
                    // 2.timeOut == true && timed ==
                    // false:看后面的代码workerQueue.poll超时时timeOut才为true,
                    // 并且timed要为false,这两个条件相悖不可能同时成立(既然有超时那么timed肯定为true)
                    // 所以超时不会继续执行而是return null结束线程。
                    if (wc <= maximumPoolSize && !(timedOut && timed))
                        break;
                    // workerCount递减,结束当前thread
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    c = ctl.get(); // Re-read ctl
                    // 需要重新检查线程池状态,因为上述操作过程中线程池可能被SHUTDOWN
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
                try {
                    // 1.以指定的超时时间从队列中取任务
                    // 2.core thread没有超时
                    Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;// 超时
                } catch (InterruptedException retry) {
                    timedOut = false;// 线程被中断重试
                }
            }
        }

    下面我们看下processWorkerExit是如何工作的

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
            // 正常的话再runWorker的getTask方法workerCount已经被减一了
            if (completedAbruptly)
                decrementWorkerCount();
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 累加线程的completedTasks
                completedTaskCount += w.completedTasks;
                // 从线程池中移除超时或者出现异常的线程
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
            // 尝试停止线程池
            tryTerminate();
            int c = ctl.get();
            // runState为RUNNING或SHUTDOWN
            if (runStateLessThan(c, STOP)) {
                // 线程不是异常结束
                if (!completedAbruptly) {
                    // 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    // 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务
                    if (min == 0 && !workQueue.isEmpty())
                        min = 1;
                    // 线程池还不为空那就不用担心了
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                // 1.线程异常退出
                // 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理
                addWorker(null, false);
            }
        }

    tryTerminate

    processWorkerExit方法中会尝试调用tryTerminate来终止线程池。这个方法在任何可能导致线程池终止的动作后执行:比如减少wokerCount或SHUTDOWN状态下从队列中移除任务。

    final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                // 以下状态直接返回:
                // 1.线程池还处于RUNNING状态
                // 2.SHUTDOWN状态但是任务队列非空
                // 3.runState >= TIDYING 线程池已经停止了或在停止了
                if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                    return;
                // 只能是以下情形会继续下面的逻辑:结束线程池。
                // 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了
                // 2.STOP状态,当调用了shutdownNow方法
                // workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
                // 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    // runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
                    // ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 进入TIDYING状态
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            // 子类重载:一些资源清理工作
                            terminated();
                        } finally {
                            // TERMINATED状态
                            ctl.set(ctlOf(TERMINATED, 0));
                            // 继续awaitTermination
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }

    shutdown这个方法会将runState置为SHUTDOWN,会终止所有空闲的线程。shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止所有的线程。主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法:

    他们的实现如下:

    public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
                advanceRunState(SHUTDOWN);
                // 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
                // tryTerminate方法中会保证队列中剩余的任务得到执行。
                interruptIdleWorkers();
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
        }
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // STOP状态:不再接受新任务且不再执行队列中的任务。
            advanceRunState(STOP);
            // 中断所有线程
            interruptWorkers();
            // 返回队列中还没有被执行的任务。
            tasks = drainQueue();
        }
        finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock,
                // 因此保证了中断的肯定是空闲的线程。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        }
        finally {
            mainLock.unlock();
        }
    }
    void interruptIfStarted() {
        Thread t;
        // 初始化时state == -1
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
  • 相关阅读:
    memcached 在windows下安装及启动
    细说 ASP.NET Cache 及其高级用法
    asp.net MVC helper 和自定义函数@functions小结
    log4net 总结
    紧跟时代步伐,让我们拥抱MVC 3
    关于node-sass安装失败的解决办法
    table自适应
    获取select选中的值
    省市三级联动
    git bush 代码提交
  • 原文地址:https://www.cnblogs.com/shsxt/p/7997782.html
Copyright © 2011-2022 走看看