1、线程池的好处
- 降低资源消耗(重复利用已创建的线程减少创建和销毁线程的开销)
- 提高响应速度(无须创建线程)
- 提高线程的可管理性
2、相关类图
JDK5以后将工作单元和执行机制分离开来,工作单元包括Runnable和Callable;执行机制由Executor框架提供,管理线程的生命周期,将任务的提交和如何执行进行解耦。Executors是一个快速得到线程池的工具类,相关的类图如下所示:
3、Executor框架接口
Executor接口
Executor接口只有一个execute方法,用来替代通常创建或启动线程的方法。
public interface Executor { void execute(Runnable command); }
ExecutorService接口
ExecutorService接口继承自Executor接口,加入了关闭方法、submit方法和对Callable、Future的支持。
ScheduledExecutorService接口
ScheduledExecutorService扩展ExecutorService接口并加入了对定时任务的支持。
4、ThreadPoolExecutor分析
ThreadPoolExecutor继承自AbstractExecutorService,也是实现了ExecutorService接口。
4.1 内部状态
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 2 private static final int COUNT_BITS = Integer.SIZE - 3; 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 4 5 // runState is stored in the high-order bits 6 private static final int RUNNING = -1 << COUNT_BITS; 7 private static final int SHUTDOWN = 0 << COUNT_BITS; 8 private static final int STOP = 1 << COUNT_BITS; 9 private static final int TIDYING = 2 << COUNT_BITS; 10 private static final int TERMINATED = 3 << COUNT_BITS; 11 12 // Packing and unpacking ctl 13 private static int runStateOf(int c) { return c & ~CAPACITY; } 14 private static int workerCountOf(int c) { return c & CAPACITY; } 15 private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl是对线程池的运行状态(高3位)和线程池中有效线程的数量(低29位)进行控制的一个字段。线程池有五种状态,分别是:
- RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
- SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
- STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
- TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止;
- TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()方法已经执行完成。
4.2 构造方法
构造方法有4个,这里只列出其中最基础的一个。
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
构造方法中参数的含义如下:
- corePoolSize:核心线程数量,线程池中应该常驻的线程数量
- maximumPoolSize:线程池允许的最大线程数,非核心线程在超时之后会被清除
- keepAliveTime:线程没有任务执行时可以保持的时间
- unit:时间单位
- workQueue:阻塞队列,存储等待执行的任务。JDK提供了如下4种阻塞队列:
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
- LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
- SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
- PriorityBlockingQuene:具有优先级的无界阻塞队列;
- threadFactory:线程工厂,来创建线程
- handler:线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
- AbortPolicy:直接抛出异常,这是默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务。
4.3 execute方法
ThreadPoolExecutor.execute(task)实现了Executor.execute(task),用来提交任务,不能获取返回值,代码如下:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * Proceed in 3 steps: 6 * 7 * 1. If fewer than corePoolSize threads are running, try to 8 * start a new thread with the given command as its first 9 * task. The call to addWorker atomically checks runState and 10 * workerCount, and so prevents false alarms that would add 11 * threads when it shouldn't, by returning false. 12 * 13 * 2. If a task can be successfully queued, then we still need 14 * to double-check whether we should have added a thread 15 * (because existing ones died since last checking) or that 16 * the pool shut down since entry into this method. So we 17 * recheck state and if necessary roll back the enqueuing if 18 * stopped, or start a new thread if there are none. 19 * 20 * 3. If we cannot queue task, then we try to add a new 21 * thread. If it fails, we know we are shut down or saturated 22 * and so reject the task. 23 */ 24 int c = ctl.get(); 25 /* 26 * workerCountOf方法取出低29位的值,表示当前活动的线程数; 27 * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中; 28 * 并把任务添加到该线程中。 29 */ 30 31 if (workerCountOf(c) < corePoolSize) { 32 /* 33 * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断; 34 * 如果为true,根据corePoolSize来判断; 35 * 如果为false,则根据maximumPoolSize来判断 36 */ 37 if (addWorker(command, true)) 38 return; 39 /* 40 * 如果添加失败,则重新获取ctl值 41 */ 42 c = ctl.get(); 43 } 44 /* 45 * 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中 46 */ 47 if (isRunning(c) && workQueue.offer(command)) { 48 // 重新获取ctl值 49 int recheck = ctl.get(); 50 // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了, 51 // 这时需要移除该command 52 // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回 53 if (! isRunning(recheck) && remove(command)) 54 reject(command); 55 /* 56 * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法 57 * 这里传入的参数表示: 58 * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动; 59 * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断; 60 * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。 61 */ 62 else if (workerCountOf(recheck) == 0) 63 addWorker(null, false); 64 } 65 /* 66 * 如果执行到这里,有两种情况: 67 * 1. 线程池已经不是RUNNING状态; 68 * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。 69 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize; 70 * 如果失败则拒绝该任务 71 */ 72 else if (!addWorker(command, false)) 73 reject(command); 74 }
如果线程池状态一直是RUNNING,则执行过程如下:
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
4.4 addWorker方法
从executor的方法实现可以看出,addWorker主要负责创建新的线程并执行任务。线程池创建新线程执行任务时,需要获取全局锁:
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 // 获取运行状态 6 int rs = runStateOf(c); 7 /* 8 * 这个if判断 9 * 如果rs >= SHUTDOWN,则表示此时不再接收新任务; 10 * 接着判断以下3个条件,只要有1个不满足,则返回false: 11 * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务 12 * 2. firsTask为空 13 * 3. 阻塞队列不为空 14 * 15 * 首先考虑rs == SHUTDOWN的情况 16 * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false; 17 * 然后,如果firstTask为空,并且workQueue也为空,则返回false, 18 * 因为队列中已经没有任务了,不需要再添加线程了 19 */ 20 // Check if queue empty only if necessary. 21 if (rs >= SHUTDOWN && 22 ! (rs == SHUTDOWN && 23 firstTask == null && 24 ! workQueue.isEmpty())) 25 return false; 26 27 for (;;) { 28 // 获取线程数 29 int wc = workerCountOf(c); 30 // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false; 31 // 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较, 32 // 如果为false则根据maximumPoolSize来比较。 33 if (wc >= CAPACITY || 34 wc >= (core ? corePoolSize : maximumPoolSize)) 35 return false; 36 // 尝试增加workerCount,如果成功,则跳出第一个for循环 37 if (compareAndIncrementWorkerCount(c)) 38 break retry; 39 // 如果增加workerCount失败,则重新获取ctl的值 40 c = ctl.get(); // Re-read ctl 41 // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 42 if (runStateOf(c) != rs) 43 continue retry; 44 // else CAS failed due to workerCount change; retry inner loop 45 } 46 } 47 48 boolean workerStarted = false; 49 boolean workerAdded = false; 50 Worker w = null; 51 try { 52 // 根据firstTask来创建Worker对象 53 w = new Worker(firstTask); 54 // 每一个Worker对象都会创建一个线程 55 final Thread t = w.thread; 56 if (t != null) { 57 final ReentrantLock mainLock = this.mainLock; 58 mainLock.lock(); 59 try { 60 // Recheck while holding lock. 61 // Back out on ThreadFactory failure or if 62 // shut down before lock acquired. 63 int rs = runStateOf(ctl.get()); 64 // rs < SHUTDOWN表示是RUNNING状态; 65 // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 66 // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 67 if (rs < SHUTDOWN || 68 (rs == SHUTDOWN && firstTask == null)) { 69 if (t.isAlive()) // precheck that t is startable 70 throw new IllegalThreadStateException(); 71 // workers是一个HashSet 72 workers.add(w); 73 int s = workers.size(); 74 // largestPoolSize记录着线程池中出现过的最大线程数量 75 if (s > largestPoolSize) 76 largestPoolSize = s; 77 workerAdded = true; 78 } 79 } finally { 80 mainLock.unlock(); 81 } 82 if (workerAdded) { 83 // 启动线程,执行任务(Worker.thread(firstTask).start()); 84 //启动时会调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。 85 t.start(); 86 workerStarted = true; 87 } 88 } 89 } finally { 90 if (! workerStarted) 91 addWorkerFailed(w); 92 } 93 return workerStarted; 94 }
4.5 Worker类
线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象。Worker类设计如下:
- 继承了AQS类,用于判断线程是否空闲以及是否可以被中断,可以方便的实现工作线程的中止操作;
- 实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
- 当前提交的任务firstTask作为参数传入Worker的构造方法;
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable 4 { 5 /** 6 * This class will never be serialized, but we provide a 7 * serialVersionUID to suppress a javac warning. 8 */ 9 private static final long serialVersionUID = 6138294804551838833L; 10 11 /** Thread this worker is running in. Null if factory fails. */ 12 final Thread thread; 13 /** Initial task to run. Possibly null. */ 14 Runnable firstTask; 15 /** Per-thread task counter */ 16 volatile long completedTasks; 17 18 /** 19 * Creates with given first task and thread from ThreadFactory. 20 * @param firstTask the first task (null if none) 21 */ 22 Worker(Runnable firstTask) { 23 setState(-1); // inhibit interrupts until runWorker 24 this.firstTask = firstTask; 25 this.thread = getThreadFactory().newThread(this); 26 } 27 28 /** Delegates main run loop to outer runWorker */ 29 public void run() { 30 runWorker(this); 31 } 32 33 // Lock methods 34 // 35 // The value 0 represents the unlocked state. 36 // The value 1 represents the locked state. 37 38 protected boolean isHeldExclusively() { 39 return getState() != 0; 40 } 41 42 protected boolean tryAcquire(int unused) { 43 if (compareAndSetState(0, 1)) { 44 setExclusiveOwnerThread(Thread.currentThread()); 45 return true; 46 } 47 return false; 48 } 49 50 protected boolean tryRelease(int unused) { 51 setExclusiveOwnerThread(null); 52 setState(0); 53 return true; 54 } 55 56 public void lock() { acquire(1); } 57 public boolean tryLock() { return tryAcquire(1); } 58 public void unlock() { release(1); } 59 public boolean isLocked() { return isHeldExclusively(); } 60 61 void interruptIfStarted() { 62 Thread t; 63 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 64 try { 65 t.interrupt(); 66 } catch (SecurityException ignore) { 67 } 68 } 69 } 70 }
4.6 runWorker方法
Worker类中的run方法调用了runWorker方法来执行任务,执行过程如下:
- 线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行可中断;
- Worker执行firstTask或从workQueue中获取任务:
- 进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
- 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
- 执行beforeExecute
- 执行任务的run方法
- 执行afterExecute方法
- 解锁操作
1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 // 获取第一个任务 4 Runnable task = w.firstTask; 5 w.firstTask = null; 6 // 允许中断 7 w.unlock(); // allow interrupts 8 boolean completedAbruptly = true; 9 try { 10 // 如果task为空,则通过getTask来获取任务 11 while (task != null || (task = getTask()) != null) { 12 w.lock(); 13 // If pool is stopping, ensure thread is interrupted; 14 // if not, ensure thread is not interrupted. This 15 // requires a recheck in second case to deal with 16 // shutdownNow race while clearing interrupt 17 if ((runStateAtLeast(ctl.get(), STOP) || 18 (Thread.interrupted() && 19 runStateAtLeast(ctl.get(), STOP))) && 20 !wt.isInterrupted()) 21 wt.interrupt(); 22 try { 23 beforeExecute(wt, task); 24 Throwable thrown = null; 25 try { 26 task.run(); 27 } catch (RuntimeException x) { 28 thrown = x; throw x; 29 } catch (Error x) { 30 thrown = x; throw x; 31 } catch (Throwable x) { 32 thrown = x; throw new Error(x); 33 } finally { 34 afterExecute(task, thrown); 35 } 36 } finally { 37 task = null; 38 w.completedTasks++; 39 w.unlock(); 40 } 41 } 42 completedAbruptly = false; 43 } finally { 44 processWorkerExit(w, completedAbruptly); 45 } 46 }
4.7 getTask方法
getTask方法用来从阻塞队列中取等待的任务
1 private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 8 // Check if queue empty only if necessary. 9 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 10 decrementWorkerCount(); 11 return null; 12 } 13 14 int wc = workerCountOf(c); 15 16 // Are workers subject to culling? 17 // timed变量用于判断是否需要进行超时控制。 18 // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时; 19 // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; 20 // 对于超过核心线程数量的这些线程,需要进行超时控制 21 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 22 23 if ((wc > maximumPoolSize || (timed && timedOut)) 24 && (wc > 1 || workQueue.isEmpty())) { 25 if (compareAndDecrementWorkerCount(c)) 26 return null; 27 continue; 28 } 29 30 try { 31 /* 32 * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null; 33 * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。 34 * 35 */ 36 Runnable r = timed ? 37 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 38 workQueue.take(); 39 if (r != null) 40 return r; 41 timedOut = true; 42 } catch (InterruptedException retry) { 43 timedOut = false; 44 } 45 } 46 }
5 任务的提交
- submit任务,等待线程池execute
- 执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中, 并阻塞等待运行结果;
- FutureTask任务执行完成后,通过UNSAFE设置waiters相应的waitNode为null,并通过LockSupport类unpark方法唤醒主线程。
1 public class Test{ 2 3 public static void main(String[] args) { 4 5 ExecutorService es = Executors.newCachedThreadPool(); 6 Future<String> future = es.submit(new Callable<String>() { 7 @Override 8 public String call() throws Exception { 9 try { 10 TimeUnit.SECONDS.sleep(2); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 return "future result"; 15 } 16 }); 17 try { 18 String result = future.get(); 19 System.out.println(result); 20 } catch (Exception e) { 21 e.printStackTrace(); 22 } 23 } 24 }
在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。
- Callable接口类似于Runnable,只是Runnable没有返回值。
- Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
- Future.get方法会导致主线程阻塞,直到Callable任务执行完成;
5.1 submit方法
AbstractExecutorService.submit()实现了ExecutorService.submit(),可以获得执行完的返回值。而ThreadPoolExecutor是AbstractExecutorService的子类,所以submit方法也是ThreadPoolExecutor的方法。
1 public Future<?> submit(Runnable task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<Void> ftask = newTaskFor(task, null); 4 execute(ftask); 5 return ftask; 6 } 7 public <T> Future<T> submit(Runnable task, T result) { 8 if (task == null) throw new NullPointerException(); 9 RunnableFuture<T> ftask = newTaskFor(task, result); 10 execute(ftask); 11 return ftask; 12 } 13 public <T> Future<T> submit(Callable<T> task) { 14 if (task == null) throw new NullPointerException(); 15 RunnableFuture<T> ftask = newTaskFor(task); 16 execute(ftask); 17 return ftask; 18 }
通过submit方法提交的Callable或者Runnable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法。
5.2 FutureTask对象
类图
内部状态
/** *... * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
内部状态的修改通过sun.misc.Unsafe修改。
get方法
1 public V get() throws InterruptedException, ExecutionException { 2 int s = state; 3 if (s <= COMPLETING) 4 s = awaitDone(false, 0L); 5 return report(s); 6 }
内部通过awaitDone方法对主线程进行阻塞,具体实现如下:
1 /** 2 * Awaits completion or aborts on interrupt or timeout. 3 * 4 * @param timed true if use timed waits 5 * @param nanos time to wait, if timed 6 * @return state upon completion 7 */ 8 private int awaitDone(boolean timed, long nanos) 9 throws InterruptedException { 10 final long deadline = timed ? System.nanoTime() + nanos : 0L; 11 WaitNode q = null; 12 boolean queued = false; 13 for (;;) { 14 if (Thread.interrupted()) { 15 removeWaiter(q); 16 throw new InterruptedException(); 17 } 18 19 int s = state; 20 if (s > COMPLETING) { 21 if (q != null) 22 q.thread = null; 23 return s; 24 } 25 else if (s == COMPLETING) // cannot time out yet 26 Thread.yield(); 27 else if (q == null) 28 q = new WaitNode(); 29 else if (!queued) 30 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 31 q.next = waiters, q); 32 else if (timed) { 33 nanos = deadline - System.nanoTime(); 34 if (nanos <= 0L) { 35 removeWaiter(q); 36 return state; 37 } 38 LockSupport.parkNanos(this, nanos); 39 } 40 else 41 LockSupport.park(this); 42 } 43 }
- 如果主线程被中断,则抛出中断异常;
- 判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回;
- 如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;
- 通过WaitNode类封装当前线程,并通过UNSAFE添加到waiters链表;
- 最终通过LockSupport的park或parkNanos挂起线程。
run方法
1 public void run() { 2 if (state != NEW || 3 !UNSAFE.compareAndSwapObject(this, runnerOffset, 4 null, Thread.currentThread())) 5 return; 6 try { 7 Callable<V> c = callable; 8 if (c != null && state == NEW) { 9 V result; 10 boolean ran; 11 try { 12 result = c.call(); 13 ran = true; 14 } catch (Throwable ex) { 15 result = null; 16 ran = false; 17 setException(ex); 18 } 19 if (ran) 20 set(result); 21 } 22 } finally { 23 // runner must be non-null until state is settled to 24 // prevent concurrent calls to run() 25 runner = null; 26 // state must be re-read after nulling runner to prevent 27 // leaked interrupts 28 int s = state; 29 if (s >= INTERRUPTING) 30 handlePossibleCancellationInterrupt(s); 31 } 32 }
FutureTask.run方法是在线程池中被执行的,而非主线程
- 通过执行Callable任务的call方法;
- 如果call执行成功,则通过set方法保存结果;
- 如果call执行有异常,则通过setException保存异常。
6 Executors类
Exectors工厂类提供了线程池的初始化接口,主要有如下几种:
newFixedThreadPool
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
创建一个固定大小、任务队列容量无界(Integer.MAX_VALUE)的线程池,其中corePoolSize =maximumPoolSize=nThreads,阻塞队列为LinkedBlockingQuene。
注意点:
- 线程池的线程数量达corePoolSize后,即使线程池没有可执行任务时,也不会释放线程;
- 线程池里的线程数量不超过
corePoolSize
,这导致了maximumPoolSize
和keepAliveTime
将会是个无用参数 ; - 由于使用了无界队列, 所以FixedThreadPool永远不会拒绝, 即饱和策略失效。
newSingleThreadExecutor
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
只有一个线程来执行无界任务队列的单一线程池。如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行。由于使用了无界队列, 所以SingleThreadPool永远不会拒绝,即饱和策略失效。与newFixedThreadPool(1)的区别在于单一线程池的大小不能再改变。
newCachedThreadPool
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
创建一个大小无界的缓冲线程池。任务队列是一个同步队列。缓冲线程池适用于执行耗时较小的异步任务。池的核心线程数=0 最大线程数=Integer.MAX_VLUE。与前两种稍微不同的是:
- 任务加入到池中,如果池中有空闲线程,则用空闲线程执行,如无则创建新线程执行。
- 池中的线程空闲超过60秒,将被销毁释放。
- 池中的线程数随任务的多少变化。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
能定时执行任务的线程,池的核心线程数由参数指定。和前面3个线程池基于ThreadPoolExecutor类实现不同的是,它基于ScheduledThreadPoolExecutor实现。
7 线程池的监控
可以使用ThreadPoolExecutor以下方法:
- getTaskCount:线程池已经执行的和未执行的任务总数;
- getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
- getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
- getPoolSize:线程池当前的线程数量;
- getActiveCount:当前线程池中正在执行任务的线程数量。