环境
jdk version:jdk1.8.0_171
线程池的接口、类关系图:
线程池的引入,主要解决以下问题:
减少系统因为频繁创建和销毁线程所带来的开销;
自动管理线程,对使用方透明,使其可以专注于任务的构建。
一、ThreadPoolExecutor
1、构造线程池
/** * 使用给定的参数创建ThreadPoolExecutor. * * @param corePoolSize 核心线程池中的最大线程数 * @param maximumPoolSize 总线程池中的最大线程数 * @param keepAliveTime 空闲线程的存活时间 * @param unit keepAliveTime的单位 * @param workQueue 任务队列, 保存已经提交但尚未被执行的线程 * @param threadFactory 线程工厂(用于指定如果创建一个线程) * @param handler 拒绝策略 (当任务太多导致工作队列满时的处理策略) */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); // 使用纳秒保存存活时间 this.threadFactory = threadFactory; this.handler = handler; }
关于ThreadPoolExecutor这个线程池,最重要的是根据系统实际情况,合理进行线程池参数的设置以及阻塞队列的选择。
现实情况下,一般会自己通过ThreadPoolExecutor的构造器去构建线程池,而非直接使用Executors工厂创建,因为这样更利于对参数的控制和调优。
另外,根据任务的特点,要有选择的配置核心线程池的大小:
如果任务是 CPU 密集型(需要进行大量计算、处理),则应该配置尽量少的线程,比如 CPU 个数 + 1,这样可以避免出现每个线程都需要使用很长时间但是有太多线程争抢资源的情况;
如果任务是 IO密集型(主要时间都在 I/O,CPU 空闲时间比较多),则应该配置多一些线程,比如 CPU 数的两倍,这样可以更高地压榨 CPU。
1.1 工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();//存放工作线程
工作线程-Worker
ThreadPoolExecutor中只有一种类型的线程,名叫Worker,它是ThreadPoolExecutor定义的内部类,
同时封装着Runnable任务和执行该任务的Thread对象,我们称它为【工作线程】,它也是ThreadPoolExecutor唯一需要进行维护的线程;
/** * Worker表示线程池中的一个工作线程, 可以与任务相关联. * 由于实现了AQS框架, 其同步状态值的定义如下: * -1: 初始状态 * 0: 无锁状态 * 1: 加锁状态 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * 与该Worker关联的线程. */ final Thread thread; /** * Initial task to run. Possibly null. */ Runnable firstTask; /** * Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // 初始的同步状态值 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);//真正创建线程的地方 } /** * 执行任务 */ public void run() { runWorker(this); } /** * 是否加锁 */ protected boolean isHeldExclusively() { return getState() != 0; } /** * 尝试获取锁 */ protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 尝试释放锁 */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } /** * 中断线程(仅任务非初始状态) */ void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
1.2 任务队列
private final BlockingQueue<Runnable> workQueue;//存放任务
1.3 核心线程数
private volatile int corePoolSize;
1.4 最大线程数
private volatile int maximumPoolSize;
1.5 拒绝策略
private volatile RejectedExecutionHandler handler;
(1)拒绝的时机
workers.size() > maximumPoolSize && workQueue已满,拒绝新来的任务;
线程池已关闭,对新提交的任务拒绝;
(2)4种拒绝策略
AbortPolicy(默认)
AbortPolicy策略其实就是抛出一个RejectedExecutionException异常:
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy
DiscardPolicy策略其实就是无为而治,什么都不做,等任务自己被回收:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy
DiscardOldestPolicy策略是丢弃任务队列中的最近一个任务,并执行当前任务:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 线程池未关闭(RUNNING) e.getQueue().poll(); // 丢弃任务队列中的最近任务 e.execute(r); // 执行当前任务 } } }
CallerRunsPolicy
CallerRunsPolicy策略相当于以自身线程来执行任务,这样可以减缓新任务提交的速度。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 线程池未关闭(RUNNING) r.run(); // 执行当前任务 } } }
2、线程池状态
/** * 保存线程池状态和工作线程数: * 低29位: 工作线程数 * 高3位 : 线程池状态 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;//29 // 最大线程数: 2^29-1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 00011111 11111111 11111111 11111111 // 线程池状态 private static final int RUNNING = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 00000000 00000000 00000000 00000000 private static final int STOP = 1 << COUNT_BITS; // 00100000 00000000 00000000 00000000 private static final int TIDYING = 2 << COUNT_BITS; // 01000000 00000000 00000000 00000000 private static final int TERMINATED = 3 << COUNT_BITS; // 01100000 00000000 00000000 00000000
2.1 ThreadPoolExecutor定义了5种线程池状态:
RUNNING : 接受新任务, 且处理已经进入阻塞队列的任务
SHUTDOWN : 不接受新任务, 但处理已经进入阻塞队列的任务
STOP : 不接受新任务, 且不处理已经进入阻塞队列的任务, 同时中断正在运行的任务
TIDYING : 所有任务都已终止, 工作线程数为0, 线程转化为TIDYING状态并准备调用terminated方法
TERMINATED : terminated方法已经执行完成
2.2 状态流转图示
3、线程池的调度流程
示例:
package test; import java.util.concurrent.*; public class PoolTest { public static void main(String[] args){ //创建1个工作线程的线程池 ExecutorService pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1)); //提交第一个任务 验证addWorker pool.submit(new Runnable() { @Override public void run() { try { Thread.sleep(60*60*1000);//保证任务长期占用 方便debug } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "-任务1"); } }); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } //提交第二个任务 验证入队workQueue workQueue.offer(command) pool.submit(new Runnable() { @Override public void run() { try { Thread.sleep(60*60*1000);//保证任务长期占用 方便debug } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "-任务2"); } }); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } //提交第三个任务 验证拒绝策略 //Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@77f03bb1 rejected from java.util.concurrent.ThreadPoolExecutor@27abe2cd[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0] // at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) // at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) // at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) // at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) // at test.PoolTest.main(PoolTest.java:54) pool.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "-任务3"); } }); //验证shutdown pool.shutdown(); //pool.shutdownNow(); } }
3.1 submit用来提交任务,内部调用execute方法
ThreadPoolExecutor使用父类AbstractExecutorService的subimit方法,subimit调用子类ThreadPoolExecutor的execute方法:
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
3.2 execute用来执行任务
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // CASE1: 工作线程数 < 核心线程池上限 if (addWorker(command, true)) // 添加工作线程并执行 return; c = ctl.get(); } // 执行到此处, 说明工作线程创建失败 或 工作线程数≥核心线程池上限 if (isRunning(c) && workQueue.offer(command)) { // CASE2: 插入任务至队列 // 再次检查线程池状态 int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) // CASE3: 插入队列失败, 判断工作线程数 < 总线程池上限 reject(command); // 执行拒绝策略 }
execute执行流程图:
execute执行小结:
(1)workers.size()<CorePoolSize,则直接新建一个工作线程并执行任务;
(2)CorePoolSize ≤ workers.size() < maximumPoolSize && workQueue未满,则尝试将任务加入到workQueue等待以后执行;
(3)CorePoolSize ≤ workers.size() < maximumPoolSize && workQueue已满,建一个工作线程立即执行任务;
(4)其他情况,执行拒绝策略;
5、线程调度过程分析
5.1 工作线程创建-addWorker
/** * 添加工作线程并执行任务,主要由两部分组成: * 第一部分是一个自旋操作,主要是对线程池的状态进行一些判断,如果状态不适合接受新任务,或者工作线程数超出了限制,则直接返回false。 * 第二部分,经过第一部分的过滤,才真正去创建工作线程并执行任务:首先将Runnable任务包装成一个Worker对象,然后加入到一个工作线程集合中(名为workers的HashSet),最后调用工作线程中的Thread对象的start方法执行任务,其实最终是委托到Worker的run方法执行 * @param firstTask 如果指定了该参数, 表示将立即创建一个新工作线程执行该firstTask任务; 否则复用已有的工作线程,从工作队列中获取任务并执行 * @param core 执行任务的工作线程归属于哪个线程池: true-核心线程池 false-非核心线程池 * core为true时表示新建的工作线程在逻辑上归属于核心线程池,所以需要判断条件 工作线程数 < corePoolSize 是否满足 * core为false时表示在新增的工作线程逻辑上属于非核心线程池,所以需要判断条件 工作线程数 < maximumPoolSize是否满足 */ private boolean addWorker(Runnable firstTask, boolean core) { //第一部分 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);// 获取线程池状态 /** * 这个if主要是判断哪些情况下, 线程池不再接受新任务执行, 而是直接返回.总结下, 有以下几种情况: * 1. 线程池状态为 STOP 或 TIDYING 或 TERMINATED: 线程池状态为上述任一一种时, 都不会再接受任务,所以直接返回 * 2. 线程池状态≥ SHUTDOWN 且 firstTask != null: 因为当线程池状态≥ SHUTDOWN时, 不再接受新任务的提交,所以直接返回 * 3. 线程池状态≥ SHUTDOWN 且 队列为空: 队列中已经没有任务了, 所以也就不需要执行任何任务了,可以直接返回 */ // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c);// 获取工作线程数 /** * 这个if主要是判断工作线程数是否超限, 以下任一情况属于属于超限, 直接返回: * 1. 工作线程数超过最大工作线程数(2^29-1) * 2. 工作线程数超过核心线程池上限(入参core为true, 表示归属核心线程池) * 3. 工作线程数超过总线程池上限(入参core为false, 表示归属非核心线程池) */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))// 工作线程数加1 break retry;// 跳出最外层循环 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs)// 线程池状态发生变化, 重新自旋判断 continue retry; // else CAS failed due to workerCount change; retry inner loop } } //第二部分 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);// 将任务包装成工作线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 重新检查线程池状态 // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 加入工作线程集合 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//如果新增工作线程成功 就启动线程 workerStarted = true; } } } finally { if (! workerStarted)// 创建/启动工作线程失败, 需要执行回滚操作 addWorkerFailed(w); } return workerStarted; }
5.2 工作线程执行-runWorker
/** * 1.while循环不断地通过getTask()方法从队列中获取任务(如果工作线程自身携带着任务,则执行携带的任务); * 2.控制执行线程的中断状态,保证如果线程池正在停止,则线程必须是中断状态,否则线程必须不是中断状态; * 3.调用task.run()执行任务; * 4.处理工作线程的退出工作。 */ final void runWorker(Worker w) { Thread wt = Thread.currentThread();// 执行任务的线程 Runnable task = w.firstTask; // 任务, 如果是null则从队列取任务 w.firstTask = null; w.unlock(); // allow interrupts// 允许执行线程被中断 //正常情况下,工作线程会存活着,不断从任务队列获取任务执行,如果获取不到任务了(getTask返回null),会置completedAbruptly 为false,然后执行清理工作——processWorkerExit(worker,false); //异常情况下,工作线程在执行过程中被中断或出现其它异常,会置completedAbruptly 为true,也会执行清理工作——processWorkerExit(worker,true); boolean completedAbruptly = true;// 表示是否因为中断而导致退出 try { while (task != null || (task = getTask()) != null) {// 当task==null时会通过getTask从队列取任务 w.lock(); /** * 下面这个if判断的作用如下: * 1.保证当线程池状态为STOP/TIDYING/TERMINATED时,当前执行任务的线程wt是中断状态(因为线程池处于上述任一状态时,均不能再执行新任务) * 2.保证当线程池状态为RUNNING/SHUTDOWN时,当前执行任务的线程wt不是中断状态 */ // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);// 钩子方法,由子类自定义实现 Throwable thrown = null; try { task.run();// 执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); // 钩子方法,由子类自定义实现 } } finally { task = null; w.completedTasks++;// 完成任务数+1 w.unlock(); } } // 执行到此处, 说明该工作线程自身既没有携带任务, 也没从任务队列中获取到任务 completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly);// 处理工作线程的退出工作 } }
5.3 获取任务-getTask
//通过自旋,不断地尝试从阻塞队列中获取一个任务,如果获取失败则返回null。 private Runnable getTask() { boolean timedOut = false; // 表示上次从阻塞队列中取任务时是否超时 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 获取线程池状态 /** * 以下IF用于判断哪些情况下不允许再从队列获取任务: * 1. 线程池进入停止状态(STOP/TIDYING/TERMINATED), 此时即使队列中还有任务未执行, 也不再执行 * 2. 线程池非RUNNING状态, 且队列为空 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount();// 工作线程数减1 return null; } int wc = workerCountOf(c);// 获取工作线程数 // Are workers subject to culling? /** * timed变量用于判断是否需要进行超时控制: * 对于核心线程池中的工作线程, 除非设置了allowCoreThreadTimeOut==true, 否则不会超时回收; * 对于非核心线程池中的工作线程, 都需要超时控制 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这里主要是当外部通过setMaximumPoolSize方法重新设置了最大线程数时,需要回收多出的工作线程 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true;// 超时仍未获取到任务 } catch (InterruptedException retry) { timedOut = false; } } }
任务在阻塞队列中排队一共有三种情况:
1.直接提交 即直接将任务提交给等待的工作线程,这时可以选择SynchronousQueue。因为SynchronousQueue是没有容量的,而且采用了无锁算法,所以性能较好,但是每个入队操作都要等待一个出队操作,反之亦然。 使用SynchronousQueue时,当核心线程池满了以后,如果不存在空闲的工作线程,则试图把任务加入队列将立即失败(execute方法中使用了队列的offer方法进行入队操作,而SynchronousQueue在调用offer时如果没有另一个线程等待出队操作,则会立即返回false),因此会构造一个新的工作线程(未超出最大线程池容量时)。 由于,核心线程池是很容易满的,所以当使用SynchronousQueue时,一般需要将maximumPoolSizes 设置得比较大,否则入队很容易失败,最终导致执行拒绝策略,这也是为什么Executors工作默认提供的缓存线程池使用SynchronousQueue作为任务队列的原因。 2.无界任务队列 无界任务队列我们的选择主要有LinkedTransferQueue、LinkedBlockingQueue(近似无界,构造时不指定容量即可),从性能角度来说LinkedTransferQueue采用了无锁算法,高并发环境下性能相对更好,但如果只是做任务队列使用相差并不大。 使用无界队列需要特别注意系统资源的消耗情况,因为当核心线程池满了以后,会首先尝试将任务放入队列,由于是无界队列所以几乎一定会成功,那么系统瓶颈其实就是硬件了。如果任务的创建速度远快于工作线程处理任务的速度,那么最终会导致系统资源耗尽。Executors工厂中创建固定线程池的方法内部就是用了LinkedBlockingQueue。
3.有界任务队列 有界任务队列,比如ArrayBlockingQueue ,可以防止资源耗尽的情况。当核心线程池满了以后,如果队列也满了,则会创建归属于非核心线程池的工作线程,如果非核心线程池也满了 ,才会执行拒绝策略。
5.4 工作线程清理-processWorkerExit
//将该退出的工作线程清理掉,然后看下线程池是否需要终止 private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 工作线程因异常情况而退出 decrementWorkerCount();//工作线程数减1(如果工作线程执行时没有出现异常, 在getTask()方法中已经对线程数减1了) final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks;// completedTaskCount记录线程池完成的总任务数 workers.remove(w);// 从工作线程集合中移除(该工作线程会自动被GC回收) } finally { mainLock.unlock(); } tryTerminate();// 根据线程池状态, 判断是否需要终止线程池 int c = ctl.get(); if (runStateLessThan(c, STOP)) {// 如果线程池状态为RUNNING/SHUTDOWN if (!completedAbruptly) {// 工作线程为正常退出 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false);// 新建一个工作线程 } }
小结:
工作线程生命周期
6、线程池的关闭
6.1 shutdown
/** * shutdown方法将线程池切换到SHUTDOWN状态(如果已经停止,则不用切换), * 并调用interruptIdleWorkers方法中断所有空闲的工作线程, * 最后调用tryTerminate尝试结束线程池 * 这里要注意,如果执行Runnable任务的线程本身不响应中断,那么也就没有办法终止任务。 */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN);// 如果线程池为RUNNING状态, 则切换为SHUTDOWN状态 interruptIdleWorkers();// 中断所有空闲线程 onShutdown(); //钩子方法, 由子类实现 } finally { mainLock.unlock(); } tryTerminate(); }
6.2 shutdownNow
/** * 它会将线程池的状态至少置为STOP, * 同时中断所有工作线程(无论该线程是空闲还是运行中), * 同时返回任务队列中的所有任务 */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP);// 如果线程池为RUNNING或SHUTDOWN状态, 则切换为STOP状态 interruptWorkers();// 中断所有工作线程 tasks = drainQueue();// 抽空任务队列中的所有任务 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
二、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承普通线程池ThreadPoolExecutor,实现ScheduledExecutorService接口,大部分细节就是ThreadPoolExecutor实现的,区别:
(1)ScheduledExecutorService在普通执行器接口(ExecutorService)的基础上引入了Future模式,使得可以限时或周期性地调度任务。
(2)ScheduledThreadPoolExecutor中的阻塞任务队列是DelayedWorkQueue,其任务元素是实现RunnableScheduledFuture接口,RunnableScheduledFuture是Future模式中的一个接口,作用就是可以异步地执行【延时/周期任务】。
1、线程池构造
使用的是ThreadPoolExecutor的构造器,只不过阻塞队列是DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
注意:maximumPoolSize这个参数对ScheduledThreadPoolExecutor其实并没有作用,除非把corePoolSize设置为0,这种情况下ScheduledThreadPoolExecutor只会创建一个属于非核心线程池的工作线程;否则,ScheduledThreadPoolExecutor只会新建归属于核心线程池的工作线程,一旦核心线程池满了,就不再新建工作线程。工作线程worker、工作线程集合workers还是用的ThreadPoolExecutor。
2、ScheduledFutureTask
是RunnableScheduledFuture接口的实现类,任务通过period字段来表示任务类型(0: 非周期任务, >0: fixed-rate任务,<0: fixed-delay任务)
ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns;//首次执行任务的时间点 this.period = 0;//任务类型 this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
3、延时队列
DelayedWorkQueue保存的元素就是ScheduledFutureTask。DelayedWorkQueue是一种堆结构,time最小的任务会排在堆顶(表示最早过期),每次出队都是取堆顶元素,这样最快到期的任务就会被先执行。如果两个ScheduledFutureTask的time相同,就比较它们的序号——sequenceNumber,序号小的代表先被提交,所以就会先执行。
3.1 add入队,调用offer
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
3.2 take出队
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
4、线程池的调度
ScheduledThreadPoolExecutor的核心调度方法有4个:
/** * 延迟delay时间 后 执行 Runnable任务 * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } /** * 延迟delay时间 后 执行 Callable任务 * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); //decorateTask方法把Runnable任务包装成ScheduledFutureTask,用户可以根据自己的需要覆写该方法 RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } /** * 延迟 initialDelay时间后开始每隔 period时间周期执行。 * 如果中间任务遇到异常,则禁止后续执行。 * 固定的频率来执行某项任务,它不受任务执行时间的影响,到时间,就执行。 * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } /** * 延迟 initialDelay时间 后,每个任务结束后再延迟 delay时间 后再执行下个任务。 * 如果中间任务遇到异常,则禁止后续执行。 * 受任务执行时间的影响,等待任务执行结束后才开始计算延迟。 * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
测试示例:
package test; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.*; public class PoolTest { public static void main(String[] args){ //创建1个工作线程的线程池 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1); /** * 测试 schedule --> delayedExecute --> ensurePrestart * 首先入队 add(task) 然后addWorker * 创建并执行在给定延迟后启用的一次性操作。 */ pool.schedule(new Runnable() { public void run() { System.out.println("The thread can only run once!"); } }, 60*60*1000, TimeUnit.MILLISECONDS); /** * 测试 scheduleAtFixedRate * 每隔一段时间打印系统时间,互不影响的 * 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期; * 也就是将在 initialDelay 后开始执行,然后在initialDelay+period 后执行, * 接着在 initialDelay + 2 * period 后执行,依此类推。 */ pool.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println(new SimpleDateFormat().format(new Date())); } }, 1000, 5000, TimeUnit.MILLISECONDS); /** * 测试scheduleWithFixedDelay * 创建并执行一个在给定初始延迟后首次启用的定期操作, * 随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。 */ pool.scheduleWithFixedDelay(new Runnable() { public void run() { System.out.println("scheduleWithFixedDelay:begin," + new SimpleDateFormat().format(new Date())); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("scheduleWithFixedDelay:end," + new SimpleDateFormat().format(new Date())); } }, 1000, 5000, TimeUnit.MILLISECONDS); } }
参考: