zoukankan      html  css  js  c++  java
  • 线程池ThreadPoolExecutor——Worker源码解析

    线程池任务运行的主流程如下:

    线程池调用execute提交任务
    —>创建Worker(设置属性thead、firstTask)
    —>worker.thread.start()
    —>实际上调用的是worker.run()
    —>线程池的runWorker(worker)
    —>worker.firstTask.run();

    可以看到,在ThreadPoolExecutor中以Worker为单位对工作线程进行管理,下面分析一下Worker的执行原理:

    1. Worker源码

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;//执行任务的线程
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;//要执行的任务
            /** Per-thread task counter */
            volatile long completedTasks;//完成任务的数量
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                //调用线程工厂创建线程
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                //实际是调用 ThreadPoolExecutor.runWorker()方法
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                //CAS获取锁,不会有阻塞
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }

    java.util.concurrent.ThreadPoolExecutor.Worker就是线程池中执行任务的类,其继承了AQS并实现Runnable,所以它可以拥有AQS与Runnable的作用。

    1.1 AQS作用

    Worker继承了AbstractQueuedSynchronizer,主要目的有两个:

    • 将锁的粒度细化到每个工Worker。
      • 如果多个Worker使用同一个锁,那么一个Worker Running持有锁的时候,其他Worker就无法执行,这显然是不合理的。
    • 直接使用CAS获取,避免阻塞。
      • 如果这个锁使用阻塞获取,那么在多Worker的情况下执行shutDown。如果这个Worker此时正在Running无法获取到锁,那么执行shutDown()线程就会阻塞住了,显然是不合理的。

    1.2 Runnable作用

    Worker还实现了Runnable,它有两个属性thead、firstTask。根据整体流程:

    线程池调用execute—>创建Worker(设置属性thead、firstTask)—>worker.thread.start()—>实际上调用的是worker.run()—>线程池的runWorker(worker)—>worker.firstTask.run()(如果firstTask为null就从等待队列中拉取一个)。

    转了一大圈最终调用最开始传进来的任务的run方法,不过通过等待队列可以重复利用worker与worker中的线程,变化的只是firstTask。下面我们对线程池的runWorker方法进行探究。

    2. Worker.run源码

    2.1 runWorker方法

    Worker实现了Runnable,其run()方法中最终是走到了线程池的runWorker()方法。

    public void run() {
        //实际是调用 ThreadPoolExecutor.runWorker()方法
        runWorker(this);
    }
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            //任务是否正常执行完成
            boolean completedAbruptly = true;
            try {
                //如果task为null就通过getTask方法获取阻塞队列中的下一个任务
                //getTask方法一般不会返回null,所以这个while类似于一个无限循环
                //worker对象就通过这个方法的持续运行来不断处理新的任务
                while (task != null || (task = getTask()) != null) {
                    //每一次任务的执行都必须获取锁来保证下方临界区代码的线程安全
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    //如果状态值大于等于STOP(状态值是有序的,即STOP、TIDYING、TERMINATED)且当前线程还没有被中断,则主动中断线程
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        //执行任务前处理操作,默认是一个空实现;在子类中可以通过重写来改变任务执行前的处理行为
                        beforeExecute(wt, task);
                        //保存任务执行过程中抛出的异常,提供给下面finally块中的afterExecute方法使用
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            //异常包装为Error
                            thrown = x; throw new Error(x);
                        } finally {
                            //任务后处理,同beforeExecute
                            afterExecute(task, thrown);
                        }
                    } finally {
                        //将循环变量task设置为null,表示已处理完成
                        task = null;
                        //加当前worker已经完成的任务数
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                //将completedAbruptly变量设置为false,表示任务正常处理完成
                completedAbruptly = false;
            } finally {
                //销毁当前的worker对象,并完成一些诸如完成任务数量统计之类的辅助性工作
                //在线程池当前状态小于STOP的情况下会创建一个新的worker来替换被销毁的worker
                processWorkerExit(w, completedAbruptly);
            }
        }

    runWorker方法的源代码中有两个比较重要的方法调用,一个是while条件中对getTask方法的调用,一个是在方法的最后对processWorkerExit方法的调用。

    2.2 getTask方法

    private Runnable getTask() {
        // 通过timeOut变量表示线程是否空闲时间超时了
        boolean timedOut = false;
    
        // 无限循环
        for (;;) {
            // 获取线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            // 如果 线程池状态>=STOP
            //    或者 (线程池状态==SHUTDOWN && 阻塞队列为空)
            // 则直接减少一个worker计数并返回null(返回null会导致当前worker被销毁)
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            // 获取线程池中的worker计数
            int wc = workerCountOf(c);
    
            // 判断当前线程是否会被超时销毁
            // 会被超时销毁的情况:线程池允许核心线程超时 或 当前线程数大于核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            // 如果 (当前线程数大于最大线程数 或 (允许超时销毁 且 当前发生了空闲时间超时))
            //   且 (当前线程数大于1 或 阻塞队列为空) —— 该条件在阻塞队列不为空的情况下保证至少会保留一个线程继续处理任务
            // 则 减少worker计数并返回null(返回null会导致当前worker被销毁)
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                // 从阻塞队列中取出一个任务(如果队列为空会进入阻塞等待状态)
                // 如果允许空闲超时销毁线程的话则带有一个等待的超时时间
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 如果获取到了任务就直接返回该任务,返回后会开始执行该任务
                if (r != null)
                    return r;
                // 如果任务为null,则说明发生了等待超时,将空闲时间超时标志设置为true
                timedOut = true;
            } catch (InterruptedException retry) {
                // 如果等待被中断了,那说明空闲时间(等待任务的时间)还没有超时
                timedOut = false;
            }
        }
    }

    getTask方法在阻塞队列中有待执行的任务时会从队列中弹出一个任务并返回,如果阻塞队列为空,那么就会阻塞等待新的任务提交到队列中直到超时(在一些配置下会一直等待而不超时),如果在超时之前获取到了新的任务,那么就会将这个任务作为返回值返回。所以一般getTask方法是不会返回null的,只会阻塞等待下一个任务并在之后将这个新任务作为返回值返回。

    当getTask方法返回null时会导致当前Worker退出,当前线程被销毁。在以下情况下getTask方法才会返回null:

    1. 当前线程池中的线程数超过了最大线程数。这是因为运行时通过调用setMaximumPoolSize修改了最大线程数而导致的结果;
    2. 线程池处于STOP状态。这种情况下所有线程都应该被立即回收销毁;
    3. 线程池处于SHUTDOWN状态,且阻塞队列为空。这种情况下已经不会有新的任务被提交到阻塞队列中了,所以线程应该被销毁;
    4. 线程可以被超时回收的情况下等待新任务超时。线程被超时回收一般有以下两种情况:
      • 超出核心线程数部分的线程等待任务超时
      • 允许核心线程超时(线程池配置)的情况下线程等待任务超时

    2.3 processWorkerExit方法

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果completedAbruptly为true则表示任务执行过程中抛出了未处理的异常
        // 所以还没有正确地减少worker计数,这里需要减少一次worker计数
        if (completedAbruptly)
            decrementWorkerCount();
    
        // 获取线程池的主锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 把将被销毁的线程已完成的任务数累计到线程池的完成任务总数上
            completedTaskCount += w.completedTasks;
            // 从worker集合中去掉将会销毁的worker
            workers.remove(w);
        } finally {
            // 释放线程池主锁
            mainLock.unlock();
        }
    
        // 尝试结束线程池
        // 这里是为了在关闭线程池时等到所有worker都被回收后再结束线程池
        tryTerminate();
    
        int c = ctl.get();
        // 如果线程池状态 < STOP,即RUNNING或SHUTDOWN
        // 则需要考虑创建新线程来代替被销毁的线程
        if (runStateLessThan(c, STOP)) {
            // 如果worker是正常执行完的,则要判断一下是否已经满足了最小线程数要求
            // 否则直接创建替代线程
            if (!completedAbruptly) {
                // 如果允许核心线程超时则最小线程数是0,否则最小线程数等于核心线程数
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果阻塞队列非空,则至少要有一个线程继续执行剩下的任务
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果当前线程数已经满足最小线程数要求
                // 那么就不创建替代线程了
                if (workerCountOf(c) >= min)
                    return;
            }
    
            // 重新创建一个worker来代替被销毁的线程
            addWorker(null, false);
        }
    }

    processWorkerExit方法会销毁当前线程对应的Worker对象,并执行一些累加总处理任务数等辅助操作,但在线程池当前状态小于STOP的情况下会创建一个新的Worker来替换被销毁的Worker。

     

     

     

    参考:

    https://segmentfault.com/a/1190000018630751

     

  • 相关阅读:
    未能加载包“Microsoft SQL Server Data Tools”
    SharePoint 客户端对象模型共用ClientContext的坑
    安装VisualStudio 2015 x64 中文企业版失败
    Could not load file or assembly 'Microsoft.SqlServer.Management.Sdk.Sfc, Version=11.0.0.0 系统找不到指定的文件。
    为Sharepoint 2010 批量创建SharePoint测试用户
    User Profile Service Application 配置同步连接时,报 MOSS MA not found
    SharePoint 2010 系统账户没完全控制权限了
    百度编辑器 UEditor 报错汇总
    更改SharePoint 2007/2010/2013 Web 应用程序端口号
    SharePoint 2013 报:网站在改进过程中处于只读状态,对此给您带来的不便,我们深表歉意
  • 原文地址:https://www.cnblogs.com/zjfjava/p/13909285.html
Copyright © 2011-2022 走看看