线程

协程
线程池
线程池介绍
在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:
* 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程的实现方式
Runnable,Thread,Callable
1 // 实现Runnable接口的类将被Thread执行,表示一个基本的任务 2 public interface Runnable { 3 // run方法就是它所有的内容,就是实际执行的任务 4 public abstract void run(); 5 } 6 //Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容 7 public interface Callable<V> { 8 // 相对于run方法的带有返回值的call方法 9 V call() throws Exception; 10 }
Executor框架

线程池重点属性
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 2 private static final int COUNT_BITS = Integer.SIZE - 3; 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 4 ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),
这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿
ctl相关方法
1 private static int runStateOf(int c) { return c & ~CAPACITY; } 2 private static int workerCountOf(int c) { return c & CAPACITY; } 3 private static int ctlOf(int rs, int wc) { return rs | wc; } 4 runStateOf:获取运行状态; 5 workerCountOf:获取活动线程数; 6 ctlOf:获取运行状态和活动线程数的值。
线程池存在5种状态
RUNNING = -1 << COUNT_BITS; //高3位为111 SHUTDOWN = 0 << COUNT_BITS; //高3位为000 STOP = 1 << COUNT_BITS; //高3位为001 TIDYING = 2 << COUNT_BITS; //高3位为010 TERMINATED = 3 << COUNT_BITS; //高3位为011
* 线程池不是RUNNING状态;
* 线程池状态不是TIDYING状态或TERMINATED状态;
* 如果线程池状态是SHUTDOWN并且workerQueue为空;
* workerCount为0;
* 设置TIDYING状态成功。

线程池的具体实现
ThreadPoolExecutor
线程池的创建
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler)
任务提交
1 1、public void execute() //提交任务无返回值 2 2、public Future<?> submit() //任务执行完成后有返回值
参数解释
* 1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
* 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
* 3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
* 4、priorityBlockingQuene:具有优先级的无界阻塞队列;
* 1、AbortPolicy:直接抛出异常,默认策略;
* 2、CallerRunsPolicy:用调用者所在的线程来执行任务;
* 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
* 4、DiscardPolicy:直接丢弃任务;
线程池监控
1 public long getTaskCount() //线程池已执行与未执行的任务总数 2 public long getCompletedTaskCount() //已完成的任务数 3 public int getPoolSize() //线程池当前的线程数 4 public int getActiveCount() //线程池中正在执行任务的线程数量
线程池原理
源码分析
execute方法
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * clt记录着runState和workerCount 6 */ 7 int c = ctl.get(); 8 /* 9 * workerCountOf方法取出低29位的值,表示当前活动的线程数; 10 * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中; 11 * 并把任务添加到该线程中。 12 */ 13 if (workerCountOf(c) < corePoolSize) { 14 /* 15 * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断; 16 * 如果为true,根据corePoolSize来判断; 17 * 如果为false,则根据maximumPoolSize来判断 18 */ 19 if (addWorker(command, true)) 20 return; 21 /* 22 * 如果添加失败,则重新获取ctl值 23 */ 24 c = ctl.get(); 25 } 26 /* 27 * 如果当前线程池是运行状态并且任务添加到队列成功 28 */ 29 if (isRunning(c) && workQueue.offer(command)) { 30 // 重新获取ctl值 31 int recheck = ctl.get(); 32 // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了, 33 // 这时需要移除该command 34 // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回 35 if (! isRunning(recheck) && remove(command)) 36 reject(command); 37 /* 38 * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法 39 * 这里传入的参数表示: 40 * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动; 41 * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断; 42 * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。 43 */ 44 else if (workerCountOf(recheck) == 0) 45 addWorker(null, false); 46 } 47 /* 48 * 如果执行到这里,有两种情况: 49 * 1. 线程池已经不是RUNNING状态; 50 * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。 51 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize; 52 * 如果失败则拒绝该任务 53 */ 54 else if (!addWorker(command, false)) 55 reject(command); 56 }
1.如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
2.如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
3.如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
4.如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 // 获取运行状态 6 int rs = runStateOf(c); 7 /* 8 * 这个if判断 9 * 如果rs >= SHUTDOWN,则表示此时不再接收新任务; 10 * 接着判断以下3个条件,只要有1个不满足,则返回false: 11 * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务 12 * 2. firsTask为空 13 * 3. 阻塞队列不为空 14 * 15 * 首先考虑rs == SHUTDOWN的情况 16 * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false; 17 * 然后,如果firstTask为空,并且workQueue也为空,则返回false, 18 * 因为队列中已经没有任务了,不需要再添加线程了 19 */ 20 // Check if queue empty only if necessary. 21 if (rs >= SHUTDOWN && 22 ! (rs == SHUTDOWN && 23 firstTask == null && 24 ! workQueue.isEmpty())) 25 return false; 26 for (;;) { 27 // 获取线程数 28 int wc = workerCountOf(c); 29 // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false; 30 // 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较, 31 // 如果为false则根据maximumPoolSize来比较。 32 // 33 if (wc >= CAPACITY || 34 wc >= (core ? corePoolSize : maximumPoolSize)) 35 return false; 36 // 尝试增加workerCount,如果成功,则跳出第一个for循环 37 if (compareAndIncrementWorkerCount(c)) 38 break retry; 39 // 如果增加workerCount失败,则重新获取ctl的值 40 c = ctl.get(); // Re-read ctl 41 // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 42 if (runStateOf(c) != rs) 43 continue retry; 44 // else CAS failed due to workerCount change; retry inner loop 45 } 46 } 47 boolean workerStarted = false; 48 boolean workerAdded = false; 49 Worker w = null; 50 try { 51 // 根据firstTask来创建Worker对象 52 w = new Worker(firstTask); 53 // 每一个Worker对象都会创建一个线程 54 final Thread t = w.thread; 55 if (t != null) { 56 final ReentrantLock mainLock = this.mainLock; 57 mainLock.lock(); 58 try { 59 int rs = runStateOf(ctl.get()); 60 // rs < SHUTDOWN表示是RUNNING状态; 61 // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 62 // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 63 if (rs < SHUTDOWN || 64 (rs == SHUTDOWN && firstTask == null)) { 65 if (t.isAlive()) // precheck that t is startable 66 throw new IllegalThreadStateException(); 67 // workers是一个HashSet 68 workers.add(w); 69 int s = workers.size(); 70 // largestPoolSize记录着线程池中出现过的最大线程数量 71 if (s > largestPoolSize) 72 largestPoolSize = s; 73 workerAdded = true; 74 } 75 } finally { 76 mainLock.unlock(); 77 } 78 if (workerAdded) { 79 // 启动线程 80 t.start(); 81 workerStarted = true; 82 } 83 } 84 } finally { 85 if (! workerStarted) 86 addWorkerFailed(w); 87 } 88 return workerStarted; 89 }
1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
2.如果正在执行任务,则不应该中断线程;
3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
5.之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
1 protected boolean tryAcquire(int unused) { 2 //cas修改state,不可重入 3 if (compareAndSetState(0, 1)) { 4 setExclusiveOwnerThread(Thread.currentThread()); 5 return true; 6 } 7 return false; 8 }
1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 // 获取第一个任务 4 Runnable task = w.firstTask; 5 w.firstTask = null; 6 // 允许中断 7 w.unlock(); // allow interrupts 8 // 是否因为异常退出循环 9 boolean completedAbruptly = true; 10 try { 11 // 如果task为空,则通过getTask来获取任务 12 while (task != null || (task = getTask()) != null) { 13 w.lock(); 14 if ((runStateAtLeast(ctl.get(), STOP) || 15 (Thread.interrupted() && 16 runStateAtLeast(ctl.get(), STOP))) && 17 !wt.isInterrupted()) 18 wt.interrupt(); 19 try { 20 beforeExecute(wt, task); 21 Throwable thrown = null; 22 try { 23 task.run(); 24 } catch (RuntimeException x) { 25 thrown = x; throw x; 26 } catch (Error x) { 27 thrown = x; throw x; 28 } catch (Throwable x) { 29 thrown = x; throw new Error(x); 30 } finally { 31 afterExecute(task, thrown); 32 } 33 } finally { 34 task = null; 35 w.completedTasks++; 36 w.unlock(); 37 } 38 } 39 completedAbruptly = false; 40 } finally { 41 processWorkerExit(w, completedAbruptly); 42 } 43 }
* 如果线程池正在停止,那么要保证当前线程是中断状态;
* 如果不是的话,则要保证当前线程不是中断状态;
1.while循环不断地通过getTask()方法获取任务;
2.getTask()方法从阻塞队列中取任务;
3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
4.调用task.run()执行任务;
5.如果task为null则跳出循环,执行processWorkerExit()方法;
6.runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
1 private Runnable getTask() { 2 // timeOut变量的值表示上次从阻塞队列中取任务时是否超时 3 boolean timedOut = false; // Did the last poll() time out? 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 // Check if queue empty only if necessary. 8 /* 9 * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断: 10 * 1. rs >= STOP,线程池是否正在stop; 11 * 2. 阻塞队列是否为空。 12 * 如果以上条件满足,则将workerCount减1并返回null。 13 * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。 14 */ 15 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 16 decrementWorkerCount(); 17 return null; 18 } 19 int wc = workerCountOf(c); 20 // Are workers subject to culling? 21 // timed变量用于判断是否需要进行超时控制。 22 // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时; 23 // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; 24 // 对于超过核心线程数量的这些线程,需要进行超时控制 25 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 26 27 /* 28 * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法; 29 * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时 30 * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1; 31 * 如果减1失败,则返回重试。 32 * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。 33 */ 34 if ((wc > maximumPoolSize || (timed && timedOut)) 35 && (wc > 1 || workQueue.isEmpty())) { 36 if (compareAndDecrementWorkerCount(c)) 37 return null; 38 continue; 39 } 40 try { 41 /* 42 * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null; 43 * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。 44 * 45 */ 46 Runnable r = timed ? 47 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 48 workQueue.take(); 49 if (r != null) 50 return r; 51 // 如果 r == null,说明已经超时,timedOut设置为true 52 timedOut = true; 53 } catch (InterruptedException retry) { 54 // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试 55 timedOut = false; 56 } 57 } 58 }
1 private void processWorkerExit(Worker w, boolean completedAbruptly) { 2 // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1; 3 // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。 4 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 5 decrementWorkerCount(); 6 final ReentrantLock mainLock = this.mainLock; 7 mainLock.lock(); 8 try { 9 //统计完成的任务数 10 completedTaskCount += w.completedTasks; 11 // 从workers中移除,也就表示着从线程池中移除了一个工作线程 12 workers.remove(w); 13 } finally { 14 mainLock.unlock(); 15 } 16 // 根据线程池状态进行判断是否结束线程池 17 tryTerminate(); 18 int c = ctl.get(); 19 /* 20 * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker; 21 * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker; 22 * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。 23 */ 24 if (runStateLessThan(c, STOP)) { 25 if (!completedAbruptly) { 26 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 27 if (min == 0 && ! workQueue.isEmpty()) 28 min = 1; 29 if (workerCountOf(c) >= min) 30 return; // replacement not needed 31 } 32 addWorker(null, false); 33 } 34 }

总结:
1.分析了线程的创建,任务的提交,状态的转换以及线程池的关闭;
2.这里通过execute方法来展开线程池的工作流程,execute方法通过corePoolSize,maximumPoolSize以及阻塞队列的大小来判断决定传入的任务应该被立即执行,还是应该添加到阻塞队列中,还是应该拒绝任务。
3.介绍了线程池关闭时的过程,也分析了shutdown方法与getTask方法存在竞态条件;
4.在获取任务时,要通过线程池的状态来判断应该结束工作线程还是阻塞线程等待新的任务,也解释了为什么关闭线程池时要中断工作线程以及为什么每一个worker都需要lock。