Java线程池概述
线程池技术在并发时经常会使用到,java中的线程池的使用是通过调用ThreadPoolExecutor来实现的。
ThreadPoolExecutor提供了四个构造函数,最后都会归结于下面这个构造方法:
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
这些参数的意义如下:
corePoolSize:该线程池中核心线程数最大值
maximumPoolSize: 该线程池中线程总数最大值
keepAliveTime:该线程池中非核心线程闲置超时时长
unit:keepAliveTime的单位
workQueue:阻塞队列BlockingQueue,维护着等待执行的Runnable对象
threadFactory:创建线程的接口,需要实现他的Thread newThread(Runnable r)方法。
RejectedExecutionHandler:饱和策略,最大线程和工作队列容量且已经饱和时execute方法都将调用RejectedExecutionHandler
ThreadPoolExecutor工作流程
大致过程陈述为:
1. 向线程池中添加任务,当任务数量少于corePoolSize时,会自动创建thead来处理这些任务;
2. 当添加任务数大于corePoolSize且少于maximmPoolSize时,不在创建线程,而是将这些任务放到阻塞队列中,等待被执行;
3. 接上面2的条件,且当阻塞队列满了之后,继续创建thread,从而加速处理阻塞队列;
4. 当添加任务大于maximmPoolSize时,根据饱和策略决定是否容许继续向线程池中添加任务,默认的饱和策略是AbortPolicy(直接丢弃)。
线程池中使用的阻塞队列
ArrayBlockingQueue:基于数组结构的有界阻塞队列,构造函数一定要传大小,FIFO(先进先出);
LinkedBlockingQueue:无界,默认大小65536(Integer.MAX_VALUE),当大量请求任务时,容易造成内存耗尽。
SynchronousQueue:同步队列,是一个特殊的BlockingQueue,它没有容量(这是因为在SynchronousQueue中,插入将等待另一个线程的删除操作,反之亦然)。具体可以参考:《Java SynchronousQueue Examples(译)》
PriorityBlockingQueue: 优先队列,无界。DelayedWorkQueue:这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
阻塞队列常见的方法如下表所示:
常见四种线程池
newCachedThreadPool
newFixedThreadPool
newSingleThreadExecutor
newScheduledThreadPool
它们通过Executors以静态方法的方式直接调用,实质上是它们最终调用的是ThreadPoolExecutor的构造方法,也就是本文最前面那段代码。
注:KeepAliveTime=0的话,表示不等待
摘自阿里巴巴开发手册:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
应用样例,更多请参看我的github
package multiThread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorTest { public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy()); System.out.println("getQueue:" + threadPool.getQueue().size()); System.out.println("remainingCapacity:" + threadPool.getQueue().remainingCapacity()); threadPool.execute(() -> { try { int count = 0; Thread.currentThread().setName("aa"); while (count <= 10) { System.out.println(Thread.currentThread().getName() + "getQueue:" + threadPool.getQueue().size()); System.out.println(Thread.currentThread().getName() + System.currentTimeMillis()); Thread.sleep(1000); count++; } } catch (Exception e) { e.printStackTrace(); } }); threadPool.execute(() -> { try { int count = 0; Thread.currentThread().setName("bbb"); while (count <= 100) { System.out.println(Thread.currentThread().getName() + "getQueue:" + threadPool.getQueue().size()); System.out.println(Thread.currentThread().getName() + System.currentTimeMillis()); Thread.sleep(1000); count++; } } catch (Exception e) { e.printStackTrace(); } }); } }