zoukankan      html  css  js  c++  java
  • 通用线程池

    通用线程池

    1. 架构模型

    2. 核心参数

    3. 继承体系

    • Executor: 顶级接口,任务执行器
    • ExecutorService:即Executor Service,跟我们正常写方法比较类似,定义了线程池的通用方法
    • AbstractExecutorService: 典型的模版方法模式实现,主流程有抽象类实现,提供钩子方法,由子类实现。

    4. AbstractExecutorService实现

    4.1 submit

    将Runable和Callable包装成RunnableFuture对象,调用子类实现的execute(RunableFuture)防范】

        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
           // 钩子函数,由子类实现具体的调度逻辑
            execute(ftask);
            return ftask;
        }
    
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    

    4.2 invokeAll

    • 首先将集合中的所有Callable包装成RunnableFuture,并调用execute(Runable)方法
    • 依次调用Future.get(),注意主动cancel和执行异常会被吞掉
    • 超时后,会尝试中断未执行完的线程

    4.3 invokeAny

    • 有任何一个执行完成就结束
    • 完成后,会尝试中断正在执行的任务(不一定能取消掉)
    • 超时会中断所有正在执行的任务

    5. ThreadPoolExecutorService实现

    • 线程池状态:

    • 运行状态
      • RUNNING:接收新的任务,处理队列中的任务
      • SHUTDOWN:不接受新任务,但是处理队列中的任务
      • STOP:不接受新任务,不处理队列中的任务,同时打断队列中的任务
      • TIDYING:所有任务都终止,工作线程数量为0,在转换为TIDYING状态后会执行钩子函数terminated()
      • TERMINATED: terminated() 执行结束
    // 用高3位表示线程池的状态, 总共5个状态,3位正好可以表示
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    

    5.1 shutdown()

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // CAS 设置保证执行状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中断空闲线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 这里会尝试终止,实际不一定能终止,最后一个线程会调用终止
        tryTerminate();
    }
    

    5.2 shutdownNow()

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // CAS 设置保证执行状态为STOP
            advanceRunState(STOP);
            // 中断所有的执行线程
            interruptWorkers();
            // 取出所有未执行的任务返回,给业务线程机会是否处理该线程
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 这里会尝试终止,实际不一定能终止,最后一个线程会调用终止
        tryTerminate();
        return tasks;
    }
    

    5.3 awaitTerminated() & tryTerminate()

        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (;;) {
                  // 线程池为Terminated才会正常结束
                    if (runStateAtLeast(ctl.get(), TERMINATED))
                        return true;
                    if (nanos <= 0)
                        return false;
                   // 利用条件变量,类似wait notify,但是这里支持等待时长
                    nanos = termination.awaitNanos(nanos);
                }
            } finally {
                mainLock.unlock();
            }
        }
    
        final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
             	  // 只有一个线程会执行到下面的代码,其他线程在上面return了
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // tidying状态才能终止,线程数为0,队列是空
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            // 钩子方法
                            terminated();
                        } finally {
                            // 状态标记为终止
                            ctl.set(ctlOf(TERMINATED, 0));
                            // 条件标量通知等待结束的线程可以放行了,之所以是signall->多个线程等待都会被放行
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    

    5.4 核心方法:execute

    执行下面的操作:

    • 一言以蔽之:先添加核心线程,然后添加到队列,队列满了后创建非核心线程。最后执行拒绝策略。
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
          
            int c = ctl.get();
            // 小于核心线程数,需要启动新任务
            if (workerCountOf(c) < corePoolSize) {
                // 会自动检测runState和workerCount, 
                // 如果添加失败,如果返回false,要么线程数超过核心线程数,要么runState已经变更,执行后续的处理
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
          
            // 如果是运行状态说明,添加失败的原因是超过核心线线程数,先添加到队列中
            if (isRunning(c) && workQueue.offer(command)) {
                // 多线程场景,double-check
                int recheck = ctl.get();
                // 不在运行态,直接回滚
                if (! isRunning(recheck) && remove(command))
                  // 执行拒绝策略  
                  reject(command);
                // 运行态,运行线程数等于0
                else if (workerCountOf(recheck) == 0)
                  // 第一个任务为null, 会从队列中取1个任务作为第一个任务执行  
                  addWorker(null, false);
            }
            // 添加到队列失败,创建非核心线程,执行任务
            else if (!addWorker(command, false))
                // 执行拒绝策略
                reject(command);
        }
    

    5.5 核心方法addWorker

    Worker(Runnable firstTask) {
       setState(-1); // inhibit interrupts until runWorker
       this.firstTask = firstTask;
       // 创建线程的runable对象传的是this,即worker对象,t.start会执行worker的run方法,调用runWorker(this)
       this.thread = getThreadFactory().newThread(this);
    }
    /**
     * firstTask: 第一个需要执行的任务
     * core: 是否创建核心线程数
     **/
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // 如果线程池runState 为Stop状态,直接返回false
            // shutDown状态,会执行队列中的任务,但不会执行新的任务,所以不需要创建新线程
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 大于核心线程数还是大雨最大线程数,取决于core参数,超过了就不能创建新线程了,返回false,外层调用者会执行拒绝策略
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // CAS保证线程安全+1,即workerCount+1
                if (compareAndIncrementWorkerCount(c))
                    // break跳出循环,执行循环后面的内容,continue不会跳出循环
                    break retry;
                // CAS失败,说明workerCount已经被其他线程变更, 重新取值判断
                c = ctl.get();  // Re-read ctl
                // 运行状态到了SHUT DOWN以后(STOP, TIDYing)重新跳出到外层循环
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // 其他情况运行状态不变,只需要重新执行下内层循环判断数量
            }
        }
        
        // 工作线程数已经+1, 如果真正启动失败,会回滚 
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
    
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        // 向容器中添加工作对象
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 启动的时候,会执行Worker对象的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // 添加失败,这里会回滚线程数
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    5.6 Woker.runWork(Worker w)方法

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           // keepAliveTime, getTask会调用阻塞队列的poll方法一直到取到为止,
           // 如果超时(keepAliveTime)未取到,会抛出中断异常,processWorkerExit会执行,删除工作线程,由GC回收
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // shutdown的时候需要清除中断标志位,因为当前线程还要执行线程中的任务
                // shutDownNow, 需要确保处于中断状态, 所以在任务中调用中断后,下一次任务会清除中断标志位
                // 中断的时候,join,wait, notify等都可以响应中断标志位
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //子类实现钩子函数
                    beforeExecute(wt, task);
                    try {
                        // 执行任务
                        task.run();
                        // 子类实现
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 有异常的情况下,该值位true
            completedAbruptly = false;
        } finally {
            // 有异常的情况下,该值位true, 会减少workerCount, 
            // 以便能重新创建线程,所有抛出异常并不会导致没有线程可用
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();  
        }
    
        tryTerminate();
    
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 保证至少一个线程运行
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
    
    没有智能的代码,源码面前了无秘密
  • 相关阅读:
    一个简单的knockout.js 和easyui的绑定
    knockoutjs + easyui.treegrid 可编辑的自定义绑定插件
    Knockout自定义绑定my97datepicker
    去除小数后多余的0
    Windows Azure Web Site (15) 取消Azure Web Site默认的IIS ARR
    Azure ARM (1) UI初探
    Azure Redis Cache (3) 创建和使用P级别的Redis Cache
    Windows Azure HandBook (7) 基于Azure Web App的企业官网改造
    Windows Azure Storage (23) 计算Azure VHD实际使用容量
    Windows Azure Virtual Network (11) 创建VNet-to-VNet的连接
  • 原文地址:https://www.cnblogs.com/dragonfei/p/15790390.html
Copyright © 2011-2022 走看看