1):线程池中几个成员变量的说明。
ctl:记录了"线程池中的任务数量"和"线程池状态"2个信息,包括32位。其中,高3位表示"线程池状态",低29位表示"线程池中的任务数量"。
在 ThreadPoolExecutor 类中的定义如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池中的运行状态的存储如下:
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING -- 对应的高3位值是111。
SHUTDOWN -- 对应的高3位值是000。
STOP -- 对应的高3位值是001。
TIDYING -- 对应的高3位值是010。
TERMINATED -- 对应的高3位值是011。
2):接下来分析类中代码的具体逻辑。线程池的中方法的执行入口是 execute()
下面是 execute(Runnable command) 执行过程:
1:判断 command 任务是否为空,任务为空则抛出异常
if (command == null) throw new NullPointerException();
2 :command 任务不为空时:
int c = ctl.get(); // 高3位为线程运行的状态 低29位:线程池中运行线程的数量 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
3:若满足线程池中运行的线程数量小于核心池数,则进入 addWorker() 方法:这是一个自旋的方式来实现逻辑
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 进入自旋 int c = ctl.get(); int rs = runStateOf(c); // 获取线程池的运行状态 如 上述的 running shutdown stop 等 //...... for (;;) { // 进入自旋 int wc = workerCountOf(c); // 获取线程池中线程数 if (wc >= CAPACITY || //判断 线程池中线程数大于等于最大整数 或者大于等于核心池数则return false wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) //cas的方式将ctl中的表示线程数的值加 1 break retry; //cas操作成功后 退出最外层的自旋 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } }
4:接下来将任务封装成一个 Worker 对象,绕后开启线程执行
w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //...... workers.add(w); //将worker 对象添加到set中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 开始执行线程的run方法 workerStarted = true; } }
5:下面看看 Worker 类中的run() 方法。
public void run() { runWorker(this); }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; //..... try { while (task != null || (task = getTask()) != null) { w.lock(); //..... try { //...... beforeExecute(wt, task); Throwable thrown = null; try { // 开始执行task任务 task.run(); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } }
从以上的逻辑可以看出run() 方法首先执行的是构造器传进来的任务 w.firstTask 如果这个任务为空,则从队列里获取任务然后执行 task = getTask()
下面看看 getTask() 的核心实现方式,主要是通过poll的方式从queue中获取任务,是一个生产消费的模式。这里是消费方
try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }
6:下面看看 线程池中的线程数大于等于核心池数:这里的主要逻辑是:判断线程池的状态是否运行,如果运行则将任务添加到队列中,这里是生产和上面分析的消费相关
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
7:如果添加到队列中失败,这时会将任务添加到线程中,线程池新建一个线程来执行这个任务,如果添加任务失败则拒绝这个任务。
else if (!addWorker(command, false)) reject(command);
到这里为止,任务的添加和执行过程已经分析完成