线程池处理任务原理
线程池实现参数解析:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize
- 核心线程数
- maximumPoolSize
- 最大线程数 ,它的值是int java.lang.Integer.MAX_VALUE = 2147483647 [0x7fffffff]
- keepAliveTime
- 线程存活时间
- unit
- 存活时间的时间单位
- workQueue
- 用于装载工作任务的队列
- threadFactory
- 生成现成的线程工厂
- handler
- 饱和策略 RejectedExecutionHandler
BlockingQueue< Runnable>:
- ArrayBlockingQueue:基于数组的有界阻塞队列
- LinkedBlockingQueue: 基于链表的阻塞队列
- SynchronousQueue: 一个没有数据缓冲的BlockingQueue,必须take元素之后才可以put。
- PriorityBlockingQueue: 优先级队列
RejectedExecutionHandler:
ThreadPoolExecutor中有四个默认实现的策略类
- CallerRunsPolicy: 启用调用者线程执行,除非线程池shut down了,任务就被丢弃了
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- AbortPolicy:直接抛出异常RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
- DiscardPolicy: 默默的丢弃任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
- DiscardOldestPolicy:丢弃队列里最久未处理的请求(the oldest unhandled request),然后执行当前任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
小结:java线程池实现是解决资源有限的条件下,尽可能少的因为线程的创建销毁等时间等待和不必要的资源浪费。尽量使用创建好的线程去执行任务,那么来看,他的本质组成就肯定少不了这几个东西:一堆创建好的线程,或者预创建线程数目下可创建的线程,执行的任务,当任务多,但是线程数目有限的时候,对于任务的装载(队列)!以及,当队列满的时候,再有新来的任务的饱和处理策略,比如:是要直接丢弃,还是扔掉队列里面的最远或者最近的任务,执行它还是保存到硬盘或者数据库等等!由此可见,一些线程+队列+饱和处理策略=线程池的实现。
jdk提供的几种线程池
Executors类提供了下述几种方法生成相应的线程池对象:
- newFixedThreadPool,有这个方法我们一步一步看出来它是核心线程数=最大线程数,工作队列是LinkedBlockingQueue,饱和策略是默认的实现AbortPolicy ,该策略的处理是总是抛出错误。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- newWorkStealingPool jdk1.8提供 暂略
- newSingleThreadExecutor
单线程,工作队列是LinkedBlockingQueue,饱和策略是默认的实现AbortPolicy
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newCachedThreadPool
该线程池的特点是初始没有核心线程,执行任务的时候再生成,并且空闲线程的存活时间是1分钟,使用的工作队列是SynchronousQueue这个没有数据缓冲的阻塞队列,饱和策略依然是默认的实现。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- newSingleThreadScheduledExecutor
它调用的是ScheduledThreadPoolExecutor(int corePoolSize)方法,只不过参数是1
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
- newScheduledThreadPool
DelayedWorkQueue: http://ju.outofmemory.cn/entry/99456
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
- shutdown 不接受新的任务但是依然会处理队列中存储的任务
- stop 不接受新任务 也不会处理队列中的剩余任务