12.1 为什么要使用线程池
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。】
使用线程池主要有以下三个原因:
- 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程。
- 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
- 可以对线程做统一管理。
12.2 线程池的原理
Java中的线程池顶层接口是Executor
接口,ThreadPoolExecutor
是这个接口的实现类。
我们先看看ThreadPoolExecutor
类。
public ThreadPoolExecutor(int corePoolSize,//核心线程,创建后就一直存在,空闲也不会被销毁 int maximumPoolSize,//线程数最大值 long keepAliveTime,//非核心线程存活时长,非核心线程空闲到一定程度就会被销毁。 TimeUnit unit,//keepAliveTime的单位 BlockingQueue<Runnable> workQueue,//线程池的等待执行的任务(Runnanble对象)队列 ThreadFactory threadFactory,//可选参数,线程工厂默认创建一个 RejectedExecutionHandler handler//可选参数,拒绝处理策略)
workQueue,工作队列,有几种常见的队列
1.ArrayBlockingQueue
数组阻塞队列,限定长度的队列(必须声明队列长度)。
2.LinkedBlockingQueue
链表阻塞队列,队列的默认长度默认值为Integer.MAX。
3.SynchronousQueue
同步队列。没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
4.DelayQueue
延迟队列。队列中的元素需要实现Delayed()方法,线程需要等待延迟时间结束后才能获取队列的任务执行。
RejectedExecutionHandler handler拒绝处理策略
线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :
-
ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
-
ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
-
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
-
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
ThreadPoolExecutor的状态
线程池本身有一个调度线程,这个线程就是用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。
故线程池也有自己的状态。ThreadPoolExecutor
类中定义了一个volatile int
变量runState来表示线程池的状态 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。
-
-
线程池创建后处于RUNNING状态。
-
调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,会等待阻塞队列的任务完成。
-
调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。
-
当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。
ThreadPoolExecutor中有一个控制状态的属性叫ctl,它是一个AtomicInteger类型的变量。
-
线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。
-
常见的4种线程池
1.FixedThreadPool 固定长度的线程池
可以用来控制并发数量,如果没有空闲的核心线程,任务会在队列中等待。核心线程数=最大线程数。采用LinkedBlockingQueue(队列为Integer.MAX,理论上无限大)。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
2.CachedThreadPool 缓存线程池
核心线程数为0,非核心线程数最大值为Integer.MAX,理论上不限。线程存活时间为60秒,使用SynchronousQueue。一个任务进来,队列将任务提交给线程执行,如果没有空闲的线程则创建新的线程执行,线程空闲存活时间为60秒。一定程度上复用了线程。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
3.SingleThreadPool 单线程线程池
核心线程数为1,非核心线程数为0,采用LinkedBlocakingQueue,与FixedThreadPool一致。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
4.ScheduledThreadPool 周期线程池
定长线程池,支持周期和定时任务,采用DelayQueue,有核心线程数和非核心线程数,线程数最大值也为Integer.MAX
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }
5. WorkStealingPool
工作窃取线程池。采用Fork/Join线程池实现的,空闲的线程会去窃取其他线程的工作队列(于队尾)。
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
线程复用
将线程封装成工作线程,采用while循环一直去Queue获取task,调用task的run方法执行,直到线程中断或者Queue中的task为0。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {//while循环 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//执行task实例的run方法,此时只是调用实例的方法,不会触发创建线程。 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
来一个例子
public static void main(String[] args){
new MyThread().run();
System.out.println(Thread.currentThread().getName() +":main");
}
public static class MyThread extends Thread{
@Override
public void run(){
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":MyThread");
}
}
输出如下,阻塞了,调用了当前的主线程来执行。
main:MyThread
main:main