zoukankan      html  css  js  c++  java
  • 并发编程01-Executor框架分析

    Executor框架

         在实际开发工作中,或多或少我们都用到过多线程。一般很少使用 new Thread(){run(){}}.start()这种创建线程。一方面影响代码规范性,另外一方面,也是不方便多线程管理。当大量任务时,线程的创建销毁都会占用很多计算资源。这也是为什么使用线程池的原因。

    • 在日常工作中,我们一般使用线程池进行管理线程。

    • 后面我们接触到的Fork/Join的线程管理ForkJoinPool也是基于Executor进行实现的。

    综合以上,在学习更多多线程相关之前,有必要了解Executor框架的相关知识;

    框架的使用示意图

    • 主线程通过创建线程任务,在Executor框架容器执行,根据需要返回执行结果。

    Executor框架构成

    • 任务:要执行的任务。线程任务需要实现:Runnable或Callable接口。(这一块的内容已经了解过了,不需要深入分析)

    • 任务执行:主要是Executor的继承几口ExecutorService的实现类完成(主要需要分析的内容)

    • 异步实现:FutureTask类(前面了解过了FutureTask类的相关知识,不需要再深入)

    源码分析:

         任务执行相关已经在之前了解过,不做更多深入。主要源码分析的是任务执行的抽象类,和异步实现类。

    ExecutorService的实现类(AbstractExecutorService)

    submit方法

    • 方法提交一个线程任务,并返回一个任务的执行结果Future;
    /**
     * 提交任务
     */  
      public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);//生成一个新的任务
            execute(ftask);//执行新的任务,这个由子类实现
            return ftask;//返回执行结果
        }
    
     //创建一个新的任务
     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    

    子类需要实现的方法

    • shutdown 方法:在此之前的任务都会执行,但是不会接收新的任务;

    • shutdownNow 方法:暂停所有激活的方法,并返回任务列表;

    • awaitTermination方法:出现终端,暂停,或者限时。阻塞等待现有任务完成

    • execute方法,任务的执行过程;

    ThreadPoolExecutor

    线程池执行过程:

    • 接收到新的线程,如果小于核心线程数,则创建核心线程

    • 当超过核心线程数,多的线程任务存放到队列中

    • 只有当队列中的存满后,再判断是否小于最大线程数,如果小于最大线程数才创建新的线程

    • 当线程任务超过最大线程数,则根据策略进行抛出异常,或者任务舍弃等操作

    注意点:

    • 核心线程,初始是在有新任务的时候创建;如果初始化有非空队列,核心线程需要提前启动,调用prestartAllCoreThreads方法;

    • ThreadFactory用于创建新的线程,也可以自定义设置线程名称等;

    • 超过keep-alive的非核心线程会被回收;

    源码分析

    任务管理

    1.状态管理

    • 1.状态间的转换
         RUNNING -> SHUTDOWN //调用ShutDown方法后
    
         (RUNNING or SHUTDOWN) -> STOP//调用ShuntDownNow方法
    
         SHUTDOWN -> TIDYING //没有线程任务并且队列为空的时候
    
         STOP -> TIDYING //线程池空了之后
    
         TIDYING -> TERMINATED //当钩子方法(execute)结束后更改状态
    
    • 2.状态对应的数据
    /**
     * 初始化用到的一些静态变量
     */
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//线程池状态
        private static final int COUNT_BITS = Integer.SIZE - 3;//29
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//00001111 11111111 111111111 11111111
        private static final int RUNNING    = -1 << COUNT_BITS;//11100000 00000000 00000000 00000000
        private static final int SHUTDOWN   =  0 << COUNT_BITS;//00000000 00000000 00000000 00000000
        private static final int STOP       =  1 << COUNT_BITS;//00100000 00000000 00000000 00000000
        private static final int TIDYING    =  2 << COUNT_BITS;//01000000 00000000 00000000 00000000
        private static final int TERMINATED =  3 << COUNT_BITS;//01100000 00000000 00000000 00000000
        private static int runStateOf(int c)     { return c & ~CAPACITY; }//获取高位三位,获取线程状态
        private static int workerCountOf(int c)  { return c & CAPACITY; }//获取后29位,获取线程个数
        private static int ctlOf(int rs, int wc) { return rs | wc; }//进行或运算
          
    
    • 3.状态含义
    RUNNING: 接受新任务并运行队列中的任务
    
    SHUTDOWN: 不接受新的任务,但是能运行队列中的任务
    
    STOP:  不接受新的任务,不运行队列中的任务 ,并且中断正在运行中的任务 
    
    TIDYING: 所有任务都完成了,存活的线程为0, 当前线程池活动线程为0,将要调用terminated方法
    
    TERMINATED: 终止状态。terminated方法调用完成以后的状态
    
    

    2.存储线程任务结构

    • 重点看到的参数是workers,线程池存储线程是通过Set集合存储所有线程节点。所以下面我们要重点了解Worker类
    //存储任务的队列
    private final BlockingQueue<Runnable> workQueue;
    
    //用于shutdown或者shuntdownNow关闭线程池的锁
     private final ReentrantLock mainLock = new ReentrantLock();
    
    //存放线程池中所有的线程节点
    private final HashSet<Worker> workers = new HashSet<Worker>();
    
    //支持条件队列
    private final Condition termination = mainLock.newCondition();
    
    //线程池最大容量
    private int largestPoolSize;
    
    //统计执行的任务数,当线程回收时才进行统计 
    private long completedTaskCount;
    
    //线程创建工厂
    private volatile ThreadFactory threadFactory;
    
    //是否允许核心线程过时
    private volatile boolean allowCoreThreadTimeOut;
    
    //默认的拒绝策略
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    
    

    Worker内部子类

    Worker类继承AbstractQueuedSynchroner并实现Runnable接口

    • 内部变量Thread保存,根据ThreadFactory创建的当前线程;

    • firstTask保存的是要执行的线程任务

    • run方法提供了节点任务执行的方法

     private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            final Thread thread;
            Runnable firstTask; 
            volatile long completedTasks;
            //构造方法    
            Worker(Runnable firstTask) {
                setState(-1); 
                this.firstTask = firstTask;//保存任务
                this.thread = getThreadFactory().newThread(this);//当前线程
            }
    
         /**
          * 获取锁,将状态0改成1
          */  
         protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
          protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }     
           /**
            * 执行当前节点的任务,后面会进行分析
            */ 
          public void run() {
                runWorker(this);
            }
    
    }
    
    

    execute方法

    主要是三个步骤:

    • 1.少于核心线程数时,增加新的线程,如果增加新的线程节点,并执行线程任务(注意:初始,小于核心线程是没新增一个线程任务,才新增一个线程节点,并不是线程启动的时候一次性创建完毕

      • runWorker方法两种情况,首先执行当前线程的任务task,执行完成后,循环遍历queue获取头节点任务;队列任务执行完后跳出队列;

      • 判断线程池状态是否为STOP状态,跳出循环;

      • 如果不为STOP状态,则执行任务,记录当前线程节点执行任务数量。执行完毕,遍历queue反复

      • 跳出循环后,线程池workers线程集合,移除当前线程节点,统计线程池执行的总任务数,判断现有线程数量是否小于核心线程数,再增加新的空任务线程节点。(注意:所有线程节点执行完后,先移除,在判断是否需要生成新的节点

    • 2.如果大于核心线程数,再尝试增加到队列,如果增加到队列中成功。

      • 2.1检查线程池是否正常运行,如果异常则移除队列中的线程任务,并执行拒绝策略

      • 2.2检查线程池线程数量,线程池现有线程数量为0,则新建线程任务

    • 3.判断线程数是否超过了最大的线程数,如果超过了,则直接执行拒绝策略

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();//获取线程池状态
            if (workerCountOf(c) < corePoolSize) {//判断运行中的线程数量是否少于核心数量,创建新的线程数
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {//增加到队列中
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))//如果线程池不是运行状态,则移除当前线程任务
                    reject(command);//执行拒绝策略
                else if (workerCountOf(recheck) == 0)//现有线程数量为0时,增加新的核心线程
                    addWorker(null, false);//增加一个空闲线程
            }
            else if (!addWorker(command, false))//判断增加的线程数是否超过了最大的线程数量,如果超过了,则执行拒绝策略
                reject(command);
        }
    
    /**
     * 作用:判断现有线程worker超过了核心线程数(或最大线程数),超过了返回false,否则增加新的线程worker
     */ 
     private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();//获取当前状态
                int rs = runStateOf(c);//获取状态高位3位
    
                /**
                 * 1. 当前线程池状态为STOP,TIDYING,TERMINATED
                 * 2. 当前线程池状态为SHUTDOWN并且已经有了第一个任务     
                 * 3. 当前线程池状态为SHUTDOWN并且任务队列为空
                 * 也就是说,在加入任务之前调用了SHUTDOWN方法,或者SHUTDOWNNOW方法后就不能再加入节点。
                 */ 
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
                /**
                 * 判断现有线程数是否超过最大线程数,如果超过最大线程数返回false
                 */ 
                for (;;) {
                    int wc = workerCountOf(c);//获取当前线程数
                    //现有线程数超过最大的数量返回false  
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))//线程个数增加1
                        break retry;
                    c = ctl.get();  // 重新获取执行数量
    
                    // 查看线程池状态是否变更,如果改变则重试  
                    if (runStateOf(c) != rs)
                        continue retry;
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
           /**
            * 1.获取独占锁;
            * 2.给线程池增加新的线程任务;
            */    
            try {
                w = new Worker(firstTask);//新键任务节点
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w); // 增加线程任务,创建新的线程
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();//这里会调用Work的run方法,下面我会具体分析
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    

    Worker类的run方法(后续抽时间补充执行流程图)

    主要作用是运行线程任务,具体步骤如下:

    • 线程节点有任务,或队列中有任务(获取头节点任务)

      • 返回空的几种情况:

        • 1.现有线程数超过最大线程数时;
        • 2.线程池时暂停状态
        • 3.线程池时关闭状态并且队列为空
        • 4.线程等待任务超时时
    • 如果当前线程节点有线程任务,或者循环返回的队列头节点有任务继续执行任务,如果没有可执行任务,则调用processWorkerExit方法,统计线程执行的任务,并清除线程池中空闲的线程;

    • 获取线程锁,并判断是否STOP状态,如果是STOP状态,说明调用了ShutdownNow方法,中断当前线程;调用processWorkerExit方法,进行任务统计及线程回收。

    • 执行线程中task的线程任务。并记录当前线程任务执行数量增加1,释放锁。

    • 最后清除线程中空闲的线程,并汇总线程任务执行总数

    /**
     * 作用:运行节点的任务;
     * 1.获取到任务后,释放锁,成为可中断的状态;
     * 2.获取到任务,或者获取队列中第一个任务不为空时;
     * 3.判断线程池状态,如果正常则执行运行节点任务;
     */
     final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;//
            w.firstTask = null;
            w.unlock(); //先释放锁,允许任务在执行之前被中断(如果调用ShutDown方法,会有锁竞争) 
            boolean completedAbruptly = true;
            try {
                /**
                 * 条件:当前节点的任务不为空,或者队列中的任务不为空;
                 * 如果任务为空,则获取头部节点的任务 
                 */     
                while (task != null || (task = getTask()) != null) {
                    w.lock();//获取锁,AQS中同步队列获取执行权
                    /**
                     *  如果线程池停止了,查看当前线程是否中断,
                     *  如果中断了,则清除当前线程中端状态。并中断当前线程中任务执行
                     *  (这里时因为ShuntDownNow方法要立即停止所有线程任务,但是只用ShutDown的话,当前线程的任务还是会继续执行。)
                     */     
                    
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);//这里是抽象方法,留出扩展的空间,如果抛出异常直接进行捕获
                        Throwable thrown = null;
                        try {
                            task.run();//执行线程的任务
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            /**
                             * 如果线程任务在执行过程中出现错误,在这里进行处理。
                             * 这里提供的是抽象方法,继承类可以进行修改,用于处理异常情况,线程任务处理
                             * 这里出现异常也可能会造成线程中断     
                             */
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;//执行完成,进行释放
                        w.completedTasks++;//完成任务++
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                /**
                 * 作用:1.统计任务执行数量;2.多余线程节点回收
                 */
                processWorkerExit(w, completedAbruptly);
            }
        }
    
       /** 有以下几种情况会返回null;
         * 1.现有线程数超过最大线程数时;
         * 2.线程池时暂停状态
         * 3.线程池时关闭状态并且队列为空
         * 4.线程等待任务超时时     
         * 主要作用: 除了以上非null的情况之外,从队列的头部获取到线程任务;
         */        
        private Runnable getTask() {
            boolean timedOut = false; 
            for (;;) {//自旋
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 检查队列是不是为空,为空则返回null;
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
                //线程池现有线程数
                int wc = workerCountOf(c);
    
                // 判断线程池现有线程数是否大于核心线程数,并且核心线程数是可过时的
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                /**
                 * 满足以下两个条件
                 * 1.现有线程数大于最大线程数
                 * 2.或现有线程数大于1且超时时
                 * 减少线程数
                 */      
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                   // 获取头节点,如果头节点不为空,则返回线程任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    /**
     * 1.记录完成的任务数,并清除完成任务的线程
     * 2.判断是否要终止线程池;
     * 3.节点回收
     */
      private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) //如果是中断状态
                decrementWorkerCount();//线程数量减1
    
            final ReentrantLock mainLock = this.mainLock;//获取线程池主锁
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;//记录完成的任务数
                /**
                 * workers中移除线程执行节点
                 * 不管是否小于核心线程数,线程节点都要先移除     
                 */
                workers.remove(w);
            } finally {
                mainLock.unlock();//释放线程池主锁
            }
    
            tryTerminate();//判断是否需要更改线程池状态
    
            int c = ctl.get();
            /**
             * 满足条件再增加一个空的线程节点; 
             * 1. 如果当前时SHUNTDOWM状态。当任务完成正常执行完毕后
             * 2. 判断核心线程是否需要回收,并且核心线程数量是否达标
             * 3. 如果核心线程数量满足则返回,否则,增加一个空任务的线程节点
             * 这里也是节点循环执行任务的根本所在
             */             
            if (runStateLessThan(c, STOP)) {
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                addWorker(null, false);//增加一个节点
            }
        }
    
     /**
      * 作用:清除空闲的线程节点,判断是否将线程池的状态变更成TERMINATED状态
      */
     final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                /**
                 * 运行状态,或者中断状态但是有任务要执行 
                 * 还有任务执行时返回
                 */
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                if (workerCountOf(c) != 0) { // 清除空闲的线程
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            terminated();//终止操作由子类进行操作
                        } finally {
                            ctl.set(ctlOf(TERMINATED, 0));//将终止状态变TERMINATED状态
                            termination.signalAll();//唤醒条件队列
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
            }
        }
    
    

    shutDown方法

    • 核心点就是,逐个中断节点时,如果节点还没中断,则先让该节点获取独占锁,然后再中断
        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();//检测是否有关闭线程池的权限
                advanceRunState(SHUTDOWN);//变更线程池状态
                /**
                 * 逐个中断线程节点
                 *(注意这里判断,线程节点如果没有中断,先让节点的独占锁,保证线程任务执行完之后才进行中断操作)     
                 */
                interruptIdleWorkers();
                onShutdown(); // hook for ScheduledThreadPoolExecutor(钩子方法,等待实现类实现)
            } finally {
                mainLock.unlock();
            }
            tryTerminate();//尝试变更线程池状态为TERMIATED,这个方法上面已经分析过了
        }
    
    //中断线程节点
     private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {//如果线程没有中断,先获取锁之后才中断,也即是等任务执行完毕后才中断
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    
    

    shutDownNow 方法

    • 中断所有线程节点,不考虑节点有任务与否;

    • 返回队列中剩余的任务数;

    public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();//检查权限
                advanceRunState(STOP);//更改线程任务为STOP状态
                interruptWorkers();//直接中断线程
                tasks = drainQueue();//获取队列中剩余的任务,并清空队列
            } finally {
                mainLock.unlock();
            }
            tryTerminate();//尝试变更线程池状态
            return tasks;
        }
    
     private void interruptWorkers() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    w.interruptIfStarted();//这里进行的操作是,只要没中断就进行中断,不会有获取锁的操作
            } finally {
                mainLock.unlock();
            }
        }
    
    

    几种拒绝策略

    • AbortPolicy 总是抛出异常(这种是默认的)
     public static class AbortPolicy implements RejectedExecutionHandler {
    
            public AbortPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    
    • CallerRunsPolicy 只要线程池没关闭就执行线程任务
      public static class CallerRunsPolicy implements RejectedExecutionHandler {
            
            public CallerRunsPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    
    • DiscardPolicy 静默处理,也就是啥都不干
      public static class DiscardPolicy implements RejectedExecutionHandler {
    
            public DiscardPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    
    
    • DiscardOldestPolicy 把队列中的第一个任务给抛掉,然后执行当前的这个任务
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    
            public DiscardOldestPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    
    

    ThreadPoolExecutor方法扩展

    • 上面源码分析中,我们也看到了ThreadPoolExecutor提供的几个钩子方法,供子类去扩展。具体如下面这个类:

    • 一般我们对线程firstTask任务执行失败后的处理也可以用提供的钩子方法进行相关处理。

     class PausableThreadPoolExecutor extends ThreadPoolExecutor {
        private boolean isPaused;
        private ReentrantLock pauseLock = new ReentrantLock();
        private Condition unpaused = pauseLock.newCondition();
     
        public PausableThreadPoolExecutor(...) { super(...); }
     
        protected void beforeExecute(Thread t, Runnable r) {//在线程任务执行之前进行的操作;
          super.beforeExecute(t, r);
          pauseLock.lock();
          try {
            while (isPaused) unpaused.await();//加入到条件队列
          } catch (InterruptedException ie) {
            t.interrupt();
          } finally {
            pauseLock.unlock();
          }
        }
     
        public void pause() {//
          pauseLock.lock();
          try {
            isPaused = true;
          } finally {
            pauseLock.unlock();
          }
        }
     
        public void resume() {
          pauseLock.lock();
          try {
            isPaused = false;
            unpaused.signalAll();//全部唤醒
          } finally {
            pauseLock.unlock();
          }
        }
      }}
    

    总结归纳

    1.ThreadPoolExecutor中线程如何管理的?超过核心线程的线程是如何被调用的?

    线程管理的主要是通过两个参数管理的,
    一个是ctl的原子变量,标识的是现存线程的个数;
    另一个参数是workers,HashSet进行存储的,具体的线程worker实例;
    
    从前面代码分析可以看出:
    
    1.未超过核心线程数,会创建一个新的worker,存储到workers集合中,并且执行worker中firstTask的任务;
    
    2.超过线程池的任务被放入到queue;
    
    3.worker执行完当前任务和队列中任务后,当队列中没有可执行任务,直接会把当前的worker移除,并统计当前worker执行的任务;
    
    4.如果workers中的线程节点数少于核心线程数,那么会调用增加addWork的方法,新增一个worker,firstTask为空去执行queue中的任务;
    
    5.循环3,4步骤,直到线程池调用shutdown方法或shutdownNow方法;
    
    从上面也可以看出,执行完任务的worker节点在队列没有任务的时候都会从workers数组中remove掉,然后新增空任务worker去处理queue中的新任务。
    
    
  • 相关阅读:
    day23 笔记
    iframe子页面与父页面通信
    js格式化时间
    自定义滚动条样式
    表格隔行换色
    css除第一个子元素的其他子元素的4种方法,超实用!
    子div在父div里居中
    红橙黄绿蓝靛紫-RGB-十六进制
    阿里巴巴矢量图标库 字体图标的下载与使用
    calc属性不生效
  • 原文地址:https://www.cnblogs.com/perferect/p/13665664.html
Copyright © 2011-2022 走看看