目录:
- execute()
- addWorker()
execute()
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * 处理过程分为以下三步: 6 * 7 * 1. 线程池线程数小于corePoolSize: 8 * 则创建新的核心线程执行任务,command=新线程的第一个执行的任务。 9 * 否则进入步骤2。 10 * 11 * 2. 若任务可以成功进入阻塞队列,则重新检查线程池状态。 12 * 如果线程池已经停止了,则将刚刚入队的任务做处理。 13 * 如果线程池内没有线程了,则创建一个新的线程 14 * 15 * 3. 如果任务入队失败,则创建一个新的非核心线程。 16 * 如果创建非核心线程失败了,则线程池可能已经关闭或饱和了。 17 * 因此需要拒绝这个任务。 18 */ 19 int c = ctl.get(); 20 // 工作线程小于corePoolSize 21 if (workerCountOf(c) < corePoolSize) { 22 // 创建新的核心线程执行任务 23 if (addWorker(command, true)) 24 return; 25 c = ctl.get(); 26 } 27 // 重新检查线程状态,并调用offer方法入队 28 if (isRunning(c) && workQueue.offer(command)) { 29 int recheck = ctl.get(); 30 // 线程处于非running状态,并且成功删除刚刚入队的任务 31 if (!isRunning(recheck) && remove(command)) 32 // 执行reject方法,拒绝此次提交的任务 33 reject(command); 34 // 线程池内没有任务了 35 else if (workerCountOf(recheck) == 0) 36 // 创建一个非核心线程任务 37 addWorker(null, false); 38 } 39 // 线程处于非running状态,或是running状态但入队失败了 40 // 尝试通过addWorker方法创建一个非核心线程 41 // 创建失败则拒绝任务 42 else if (!addWorker(command, false)) 43 reject(command); 44 }
addWorker()
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 // 外层循环标记 3 retry: 4 for (;;) { 5 int c = ctl.get(); 6 // 获取运行状态 7 int rs = runStateOf(c); 8 9 // 当线程池状态为SHUTDOWN、STOP、TIDYING、TERMINATED且 10 // 状态不为SHUTDOWN 或 firstTask!=null 或 队列为空时返回false 11 if (rs >= SHUTDOWN && 12 ! (rs == SHUTDOWN && 13 firstTask == null && 14 ! workQueue.isEmpty())) 15 // 添加工作线程失败 16 return false; 17 18 for (;;) { 19 // 获取工作线程数 20 int wc = workerCountOf(c); 21 // 工作线程数大于最大容量 22 // 或在创建核心线程时,工作线程数大于核心线程数 23 // 或在创建非核心线程时,工作线程数大于最大线程数 24 if (wc >= CAPACITY || 25 wc >= (core ? corePoolSize : maximumPoolSize)) 26 // 添加工作线程失败 27 return false; 28 // CAS自增工作线程数,成功则调到最外层循环 29 if (compareAndIncrementWorkerCount(c)) 30 break retry; 31 c = ctl.get(); // Re-read ctl 32 // 如果内存循环获取到的线程工作状态与外层循环的不一致(在执行内存循环时状态被修改) 33 if (runStateOf(c) != rs) 34 // 则进入下一次内存循环 35 continue retry; 36 // else CAS failed due to workerCount change; retry inner loop 37 } 38 } 39 40 // 标记工作线程是否被启动 41 boolean workerStarted = false; 42 // 标记工作线程是否被添加成功 43 boolean workerAdded = false; 44 // 工作线程 45 Worker w = null; 46 try { 47 w = new Worker(firstTask); 48 final Thread t = w.thread; 49 if (t != null) { 50 final ReentrantLock mainLock = this.mainLock; 51 // 使用可重入锁加锁 52 mainLock.lock(); 53 try { 54 // 获取线程池工作状态 55 int rs = runStateOf(ctl.get()); 56 57 // rs < SHUTDOWN:RUNNING 58 // rs == SHUTDOWN && firstTask为空 59 if (rs < SHUTDOWN || 60 (rs == SHUTDOWN && firstTask == null)) { 61 if (t.isAlive()) // precheck that t is startable 62 throw new IllegalThreadStateException(); 63 // 添加此工作线程 64 workers.add(w); 65 int s = workers.size(); 66 // 如果工作线程数超过largestPoolSize 67 if (s > largestPoolSize) 68 // 则设置largestPoolSize=5 69 largestPoolSize = s; 70 workerAdded = true; 71 } 72 } finally { 73 mainLock.unlock(); 74 } 75 if (workerAdded) { 76 // 当满足workerAdded时,启动线程逻辑 77 t.start(); 78 workerStarted = true; 79 } 80 } 81 } finally { 82 if (! workerStarted) 83 addWorkerFailed(w); 84 } 85 return workerStarted; 86 }
其实大多数的逻辑都是对线程状态的卡控,当满足workerAdded时,启动线程逻辑(thread.start())。