ThreadPoolExecutor使用方式、工作机理以及参数的详细介绍,请参照《第十二章 ThreadPoolExecutor使用与工作机理 》
1、源代码主要掌握两个部分
- 线程池的创建:构造器
- 提交任务到线程池去执行:execute()
2、构造器
2.1、一些属性:
/** * runState provides the main lifecyle control, taking on values: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TERMINATED * When both queue and pool are empty * STOP -> TERMINATED * When pool is empty */ volatile int runState; static final int RUNNING = 0;//接收新的任务,会处理队列中的任务 static final int SHUTDOWN = 1;//不接收新的任务,但是会处理队列中的任务 static final int STOP = 2;//不接收新的任务,也不会处理队列中的任务,而且还会中断正在执行的任务 static final int TERMINATED = 3;//STOP+中止所有线程 private final BlockingQueue<Runnable> workQueue;//队列 /** * 对poolSize, corePoolSize, maximumPoolSize, runState, and workers set上锁 */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 支持awaitTermination的等待条件 */ private final Condition termination = mainLock.newCondition(); /** * pool中的所有工作线程集合;仅仅在持有mainLock的时候才允许被访问 */ private final HashSet<Worker> workers = new HashSet<Worker>(); private volatile long keepAliveTime; /** * false(默认):当核心线程处于闲置状态时,也会存活 * true:核心线程使用keepAliveTime来决定自己的存活状态 */ private volatile boolean allowCoreThreadTimeOut; /** * Core pool size,仅仅在持有mainLock的时候才允许被更新, * 因为是volatile允许并发读(即使是在更新的过程中) */ private volatile int corePoolSize; /** * Maximum pool size, 其他同上 */ private volatile int maximumPoolSize; /** * Current pool size, 其他同上 */ private volatile int poolSize; /** * 回绝处理器 */ private volatile RejectedExecutionHandler handler; /** * 所有的线程都通过这个线程工厂的addThread方法来创建。 */ private volatile ThreadFactory threadFactory; /** * Tracks largest attained pool size. */ private int largestPoolSize; /** * 已经完成的任务数.仅仅在工作线程被终结的时候这个数字才会被更新 */ private long completedTaskCount; /** * 默认的回绝处理器(回绝任务并抛出异常) */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
说明:因为属性不多,这里列出了全部属性。
2.2、构造器:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { /* * 检查参数 */ if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); /* * 初始化值 */ this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime);//转成纳秒 this.threadFactory = threadFactory; this.handler = handler; }
说明:4个构造器(1个5参+2个6参+1个7参)
注意:默认情况下,构造器只会初始化参数,不会提前构建好线程
建议:构造器参数众多,建议使用构建器模式,关于构建器模式的实际使用范例,请参照《第二章 Google guava cache源码解析1--构建缓存器》
构造器中默认线程工厂的创建:Executors中的方法
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } /** * 默认的线程工厂 */ static class DefaultThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1);//池数量 final ThreadGroup group;//线程组 final AtomicInteger threadNumber = new AtomicInteger(1);//线程数量 final String namePrefix; /* * 创建默认的线程工厂 */ DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } /* * 创建一个新的线程 */ public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),//新线程的名字 0); /* * 将后台线程设置为应用线程 */ if (t.isDaemon()) t.setDaemon(false); /* * 将线程的优先级全部设置为NORM_PRIORITY */ if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
说明,其中的newThread()方法会在第三部分用到。
3、提交任务的线程池去执行execute(Runnable command)
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /** * 这一块儿就是整个工作机理的部分(代码比较精致) * 1、addIfUnderCorePoolSize * 1)如果当前线程数poolSize<核心线程数corePoolSize并且pool的状态为RUNNING, * 1.1)先获取锁 * 1.2)根据传入的任务firstTask创建一个Work对象,在该对象中编写了run()方法,在该run()方法中会真正的去执行firstTask的run() * 说明:关于Work对象run部分的内容,查看Work内部类的run()方法上边的注释以及与其相关方法的注释 * 1.3)通过线程工厂与上边创建出来的work对象w创建新的线程t,将w加入工作线程集合, * 然后启动线程t,之后就会自动执行w中的run(),w中的run()又会调用firstTask的run(),即处理真正的业务逻辑 * * 2、如果poolSize>=corePoolSize或者上边的执行失败了 * 1)如果pool的状态处于RUNNING,将该任务入队(offer(command)) * 如果入队后,pool的状态不是RUNNING了或者池中的线程数为0了,下边的逻辑具体去查看注释 * 2)addIfUnderMaximumPoolSize(同addIfUnderCorePoolSize) * 如果增加线程也不成功,则回绝任务。 * */ if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
3.1、addIfUnderCorePoolSize(Runnable firstTask)
/** * 创建并且启动一个新的线程来处理任务 * 1、其第一个任务就是传入的firstTask参数 * 2、该方法仅仅用于当前线程数小于核心线程数并且pool没有被关掉的时候 */ private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock();//获取锁 try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask);//创建新线程 } finally { mainLock.unlock();//释放锁 } return t != null; }
addThread(Runnable firstTask)
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask);//构造一个work Thread t = threadFactory.newThread(w);//创建线程 boolean workerStarted = false; if (t != null) {// if (t.isAlive()) //如果t线程已经启动了,而且还没有死亡 throw new IllegalThreadStateException(); w.thread = t; workers.add(w);//将w工作线程加入workers线程池 int nt = ++poolSize;//当前的池数量+1 if (nt > largestPoolSize) largestPoolSize = nt; try { t.start();//启动线程 workerStarted = true; } finally { if (!workerStarted)//启动线程没有成功 workers.remove(w);//将w从workers集合中删除 } } return t; }
newThread(Runnable r)
该方法在构建上边的默认线程工厂部分已经说过了。
Work内部类:
/** * 工作线程。 */ private final class Worker implements Runnable { /** * 在每一个任务的执行前后都会获取和释放runLock。 * 该锁只要是为了防止中断正在执行任务的work线程 */ private final ReentrantLock runLock = new ReentrantLock(); /** * Initial task to run before entering run loop. * 1、Possibly null. */ private Runnable firstTask; /** * 每个work线程完成的任务总量 * accumulated into completedTaskCount upon termination. */ volatile long completedTasks; Thread thread; /** * 该work中的线程是不是确实正在执行了run() */ volatile boolean hasRun = false; Worker(Runnable firstTask) { this.firstTask = firstTask; } /* * true:已经有线程持有了该锁 */ boolean isActive() { return runLock.isLocked(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock();//获取锁runLock try { /* * 如果pool状态为STOP或TERMINATED,确保线程被打断; * 如果不是,确保线程不要被打断 */ if ((runState >= STOP || (Thread.interrupted() && runState >= STOP)) && hasRun) thread.interrupt(); /* * 确保afterExecute会被执行仅仅当任务完成了(try)或抛出了异常(catch) */ boolean ran = false; beforeExecute(thread, task);//执行任务的run()方法之前要执行的操作 try { task.run();//执行线程的run()方法 ran = true; afterExecute(task, null);//执行任务的run()方法之后要执行的操作 ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock();//释放锁runLock } } /** * Main run loop * 运行当前任务task,运行结束后,尝试获取队列中的其他任务, * 如果最后通过各种方式都获取不到,就回收该线程,如果获取到了,就用该线程继续执行接下来的任务 * 最后,当获取不到任何任务去执行时,就将该线程从works线程集合中删除掉 */ public void run() { try { hasRun = true; Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task);//运行该任务 task = null; } } finally { workerDone(this);//将该线程从works集合中删除 } } }
说明:这里列出了该内部类的全部属性和常用方法。
getTask()
/** * 获取下一个worker线程将要运行的任务 * Gets the next task for a worker thread to run. */ Runnable getTask() { for (;;) {//无限循环 try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll();//处理queue中的任务 //下面的runState==RUNNING else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //从队头获取任务,如果没有任务,等待keepAliveTime的时间 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else //从队头获取任务,如果没有任务,阻塞等待 r = workQueue.take(); if (r != null) return r; if (workerCanExit()) {//允许回收获取任务失败的线程 if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers();//中断闲置的work线程 return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
workerCanExit()
/** * 检测一个获取任务失败的work线程是否可以退出了。 * 出现下面三种情况,work线程就会死亡。 * 1、如果pool的状态为STOP或TERMINATED * 2、队列为空 * 3、允许回收核心线程并且池中的线程数大于1和corePoolSize的最大值 */ private boolean workerCanExit() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean canExit; try { canExit = runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1, corePoolSize)); } finally { mainLock.unlock(); } return canExit; }
workerDone(Worker w)
void workerDone(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w);//从workers集合中删除该线程 if (--poolSize == 0)//如果池中的线程数为0 tryTerminate(); } finally { mainLock.unlock(); } }
3.2、ensureQueuedTaskHandled(Runnable command)
/** * 在一个task入队之后重新检查state。 * 当一个task入队后,pool的state发生了变化,该方法就会被调用。 * 如果一个task入队的同时,shutdownNow方法发生了调用,该方法就必须从队列中移除并回绝 * 否则该方法会保证至少有一个线程来处理入队的task */ private void ensureQueuedTaskHandled(Runnable command) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean reject = false; Thread t = null; try { int state = runState; if (state != RUNNING && workQueue.remove(command)) reject = true; else if (state < STOP && poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty()) t = addThread(null); } finally { mainLock.unlock(); } if (reject) reject(command); }
3.3、addIfUnderMaximumPoolSize(Runnable firstTask)
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null; }
说明:该方法的其他方法与addIfUnderCorePoolSize(Runnable firstTask)一样。
3.4、reject(Runnable command)
void reject(Runnable command) { handler.rejectedExecution(command, this); }
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } /** 直接抛异常 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); } }
说明:明白了上一章将的线程池机理,按着这个机理去看源代码是非常容易的事情。
总结:
- 上一章的工作机理
- 上一章的参数详细说明