zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor浅谈

    在了解线程池之前,希望你已了解 Java 内存模型AQS CAS

         /**					
    	 * The runState provides the main lifecycle control, taking on values:
         *
         *   RUNNING:  Accept new tasks and process queued tasks
         *   SHUTDOWN: Don't accept new tasks, but process queued tasks
         *   STOP:     Don't accept new tasks, don't process queued tasks,
         *             and interrupt in-progress tasks
         *   TIDYING:  All tasks have terminated, workerCount is zero,
         *             the thread transitioning to state TIDYING
         *             will run the terminated() hook method
         *   TERMINATED: terminated() has completed
         *
         * The numerical order among these values matters, to allow
         * ordered comparisons. The runState monotonically increases over
         * time, but need not hit each state. The transitions are:
         *
         * RUNNING -> SHUTDOWN
         *    On invocation of shutdown(), perhaps implicitly in finalize()
         * (RUNNING or SHUTDOWN) -> STOP
         *    On invocation of shutdownNow()
         * SHUTDOWN -> TIDYING
         *    When both queue and pool are empty
         * STOP -> TIDYING
         *    When pool is empty
         * TIDYING -> TERMINATED
         *    When the terminated() hook method has completed terminated()
    	 */
    	
    	// 前三位表示运行状态,后面存储当前运行 workerCount
    	private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3
    	
    	// 最大容量
    	private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 00011111111111111111111111111111
    	
    	/**
         * Maximum pool size. Note that the actual maximum is internally
         * bounded by CAPACITY. 实际线程池大小还是由 CAPACITY 决定
         */
        private volatile int maximumPoolSize;
        
    	// 线程池的几个状态 官方注释在最上方
    	// 接受新的任务
    	private static final int RUNNING    = -1 << COUNT_BITS; // 11100000000000000000000000000000
    	
    	// 不接受新的任务,但是已在队列中的任务,还会继续处理
    	private static final int SHUTDOWN   =  0 << COUNT_BITS; // 00000000000000000000000000000000
    	
    	// 不接受,不处理新的任务,且中断正在进行中的任务
    	private static final int STOP       =  1 << COUNT_BITS; // 00100000000000000000000000000000
    	
    	// 所有任务已停止,workerCount 清零,注意 workerCount 是由 workerCountOf(int c) 计算得出的
    	private static final int TIDYING    =  2 << COUNT_BITS; // 01000000000000000000000000000000
    	
    	// 所有任务已完成
    	private static final int TERMINATED =  3 << COUNT_BITS; // 01100000000000000000000000000000
    	
    	// 线程池运行状态和已工作的 workerCount 初始化为 RUNNING 和 0
    	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    	
    	// 计算当前 state
    	// ~CAPACITY 为 11100000000000000000000000000000 & c(假如前三位为 000)说明线程池已经 SHUTDOWN
    	private static int runStateOf(int c)     { return c & ~CAPACITY; }
    	
    	// 同时拿到 state workerCount
    	private static int ctlOf(int rs, int wc) { return rs | wc; }
    	
    	// & 可以计算出当前工作的 workerCount
    	private static int workerCountOf(int c)  { return c & CAPACITY; }
    	
    	// 线程入列
    	public void execute(Runnable command) {
    	        if (command == null)
    	            throw new NullPointerException();
    	   
    	   		// 获取线程池 state 和 workerCount
    	   		// 判断是否满足加入核心线程
    	        int c = ctl.get();
    	        if (workerCountOf(c) < corePoolSize) {
    	        	// 以核心线程的方式加入队列
    	            if (addWorker(command, true))
    	                return;
    	            // 添加失败 获取最新的线程池 state 和 workerCount
    	            c = ctl.get();
    	        }
    	        // 在运行且成功加入队列
    	        if (isRunning(c) && workQueue.offer(command)) {
    	            int recheck = ctl.get();
    	            // 再检查一次,不在运行就拒绝任务
    	            if (!isRunning(recheck) && remove(command))
    	                reject(command);
    	            else if (workerCountOf(recheck) == 0)
    	            	// 加入一个 null
    	                addWorker(null, false);
    	        }
    	        // 加入失败就拒绝任务
    	        else if (!addWorker(command, false))
    	            reject(command);
    	    }
    	
    	// 实际的操作
    	private boolean addWorker(Runnable firstTask, boolean core) {
    	        retry:
    	        for (;;) {
    	        	// 获得当前 state 和 workerCount
    	            int c = ctl.get();
    	            int rs = runStateOf(c);
    	
    				// 大于 SHUTDOWN 即 STOP TIDYING TERMINATED
    	            // Check if queue empty only if necessary.
    	            if (rs >= SHUTDOWN &&
    	                ! (rs == SHUTDOWN &&
    	                   firstTask == null &&
    	                   ! workQueue.isEmpty()))
    	                return false;
    	
    	            for (;;) {
    	            	// 计算 workerCount
    	                int wc = workerCountOf(c);
    	                if (wc >= CAPACITY ||
    	                    wc >= (core ? corePoolSize : maximumPoolSize))
    	                    return false;
    	                // 成功了就退出
    	                if (compareAndIncrementWorkerCount(c))
    	                    break retry;
    	                c = ctl.get();  // Re-read ctl
    	                if (runStateOf(c) != rs)
    	                	// 走到这一步说明 rs 为 RUNNING 或 SHUTDOWN 可以重新尝试加入
    	                    continue retry;
    	                // else CAS failed due to workerCount change; retry inner loop
    	            }
    	        }
    	
    	        boolean workerStarted = false;
    	        boolean workerAdded = false;
    	        Worker w = null;
    	        try {
    	        	// 统一线程的名字
    	        	// 设置 daemon 和 priority
    	            w = new Worker(firstTask);
    	            final Thread t = w.thread;
    	            if (t != null) {
    	                final ReentrantLock mainLock = this.mainLock;
    	                mainLock.lock();
    	                try {
    	                    // Recheck while holding lock.
    	                    // Back out on ThreadFactory failure or if
    	                    // shut down before lock acquired.
    	                    int rs = runStateOf(ctl.get());
    
    						// 异常检查
    	                    if (rs < SHUTDOWN ||
    	                        (rs == SHUTDOWN && firstTask == null)) {
    	                        if (t.isAlive()) // precheck that t is startable
    	                            throw new IllegalThreadStateException();
    	                        workers.add(w);
    	                        int s = workers.size();
    	                        if (s > largestPoolSize)
    	                            largestPoolSize = s;
    	                        workerAdded = true;
    	                    }
    	                } finally {
    	                    mainLock.unlock();
    	                }
    	                // 添加成功 启动线程
    	                if (workerAdded) {
    	                    t.start();
    	                    workerStarted = true;
    	                }
    	            }
    	        } finally {
    	        	// 加入失败 
    	            if (! workerStarted)
    	                addWorkerFailed(w);
    	        }
    	        return workerStarted;
    	    }
    	    
    	// 加入失败 做一些扫尾清理
    	private void addWorkerFailed(Worker w) {
    	        final ReentrantLock mainLock = this.mainLock;
    	        mainLock.lock();
    	        try {
    	            if (w != null)
    	                workers.remove(w);
    	            // workerCount-1
    	            decrementWorkerCount();
    	            // 尝试更新状态 何为尝试,即需要满足一定条件,而不是冒然去做某事
    	            tryTerminate();
    	        } finally {
    	            mainLock.unlock();
    	        }
    	    }
    
  • 相关阅读:
    ASP输出成n列的表格形式显示的方法,多行多列
    1003
    1005
    Linq to sql 迭代器bug?
    通过global.asax向所有文件注册js
    asp.net任务调度之Quartz.net
    SQL Server 批量插入数据的两种方法
    C#通过反射实例化对象
    linq to sql 学习
    C#全角和半角转换
  • 原文地址:https://www.cnblogs.com/eahau/p/11003102.html
Copyright © 2011-2022 走看看