一 线程池的概念
线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,
从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。
通俗点说就是当有工作来,就会向线程池拿一个线程,当工作完成后,并不是直接关闭线程,而是将这个线程归还给线程池供其他任务使用。
二 创建线程池
可通过 ThreadPoolExecutor 创建线程池
我们看一下该类的构造器
public ThreadPoolExecutor(int paramInt1, int paramInt2, long paramLong, TimeUnit paramTimeUnit, BlockingQueue<Runnable> paramBlockingQueue, ThreadFactory paramThreadFactory, RejectedExecutionHandler paramRejectedExecutionHandler) { this.ctl = new AtomicInteger(ctlOf(-536870912, 0)); this.mainLock = new ReentrantLock(); this.workers = new HashSet(); this.termination = this.mainLock.newCondition(); if ((paramInt1 < 0) || (paramInt2 <= 0) || (paramInt2 < paramInt1) || (paramLong < 0L)) throw new IllegalArgumentException(); if ((paramBlockingQueue == null) || (paramThreadFactory == null) || (paramRejectedExecutionHandler == null)) throw new NullPointerException(); this.corePoolSize = paramInt1; this.maximumPoolSize = paramInt2; this.workQueue = paramBlockingQueue; this.keepAliveTime = paramTimeUnit.toNanos(paramLong); this.threadFactory = paramThreadFactory; this.handler = paramRejectedExecutionHandler; }
对构造器内各参数进行解释
corePoolSize : 线程池的核心池大小,在创建线程池之后,线程池默认没有任何线程(数量为0)。
当任务过来时就会去创建线程,直到到达corePoolSize的大小,才会往阻塞队列里插入
当然如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程
maximumPoolSize:线程池允许的最大线程数,他表示最大能创建多少个线程。maximumPoolSize肯定是大于等于corePoolSize
这个值一般在任务队列满了以后,才会用到(任务队列满了后,新任务会创建新线程,直到所有线程数等于 maximumPoolSize
keepAliveTime :表示线程没有任务时最多保持多久然后停止。默认情况下,只有线程池中线程数大于corePoolSize 时,
keepAliveTime 才会起作用。
换句话说,当线程池中的线程数大于corePoolSize,并且一个线程空闲时间达到了keepAliveTime,
那么就是shutdown(把线程kill掉,移除线程池)
Unit : keepAliveTime 的单位。
workQueue :一个阻塞队列,用来存储等待执行的任务,当线程池中的线程数超过它的corePoolSize的时候,
线程会进入阻塞队列进行阻塞等待。
通过workQueue,线程池实现了阻塞功能
threadFactory :线程工厂,用来创建线程(也可以自定义线程工厂,指定线程名称格式等)
handler :表示当拒绝处理任务时的策略。
任务缓存队列
在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
1)有界任务队列ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)无界任务队列LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)直接提交队列synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
拒绝策略(handler 的值)
AbortPolicy:丢弃任务并抛出RejectedExecutionException,默认策略
CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
DiscardPolicy:丢弃任务,不做任何处理。
线程池的任务处理策略:
如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,
若添加成功,则该任务会等待空闲线程将其取出去执行;
若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;
如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
流程图
当任务提交,当前线程池中线程数量与corePoolSize 比较
三 4种常见的线程池(都有弊端,不推荐)
//固定大小线程池
Executors.newFixedThreadPool(20); //创建固定大小线程池 public static ExecutorService newFixedThreadPool(int var0) { return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
特点:
固定大小的线程池,可以指定线程池的大小,该线程池corePoolSize和maximumPoolSize相等,阻塞队列使用的是LinkedBlockingQueue,大小为整数最大值。
该线程池中的线程数量始终不变,当有新任务提交时,线程池中有空闲线程则会立即执行,如果没有,则会暂存到阻塞队列。对于固定大小的线程池,不存在线程数量的变化。
同时使用无界的LinkedBlockingQueue来存放执行的任务。当任务提交十分频繁的时候,LinkedBlockingQueue会越来越大
------------------------------------- end -------------------------------------------------------------------------------------
//单例线程池
Executors.newSingleThreadExecutor(); //创建单例线程池 public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue())); }
特点:
单个线程线程池,只有一个线程的线程池,阻塞队列使用的是LinkedBlockingQueue,若有多余的任务提交到线程池中,
则会被暂存到阻塞队列,待空闲时再去执行。按照先入先出的顺序执行任务。
------------------------------------- end ----------------------------------------------------------
//缓存线程池
Executors.newCachedThreadPool(); //创建缓存线程池 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
特点:
如果线程池里线程数量超过处理需要,可灵活回收空闲线程,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
如果线程数不够处理新任务,则自动创建新线程。线程池中缓存的线程默认存活60秒。线程的核心池corePoolSize大小默认为0,
核心池最大为Integer.MAX_VALUE,阻塞队列使用的是SynchronousQueue。是一个直接提交的阻塞队列,
他总会迫使线程池增加新的线程去执行新的任务。在没有任务执行时,当线程的空闲时间超过keepAliveTime(60秒),则工作线程将会终止被回收,当提交新任务时,如果没有空闲线程,
则创建新线程执行任务,会导致一定的系统开销。
如果同时又大量任务被提交,而且任务执行的时间不是特别快,那么线程池便会新增出等量的线程池处理任务,这很可能会很快耗尽系统的资源。
------------------------------------- end ----------------------------------------------------------------------------------------
//定时线程池
Executors.newScheduledThreadPool(5); //创建定时线程池 public static ScheduledExecutorService newScheduledThreadPool(int var0) { return new ScheduledThreadPoolExecutor(var0); }
特点:
定时线程池,该线程池可用于周期性地去执行任务,通常用于周期性的同步数据。
scheduleAtFixedRate:是以固定的频率去执行任务,周期是指每次执行任务成功执行之间的间隔。
scheduleWithFixedDelay:是以固定的延时去执行任务,延时是指上一次执行成功之后和下一次开始执行的之前的时间。
------------------------------------- end -----------------------------------------------------------------
结论:以上几种线程池都有缺陷
1) FixedThreadPool(固定长度)和 SingleThreadPool(单例)
允许的请求队列长度为Integer.MAX_VALUE, 可能会堆积大量的请求而导致OOM(内存溢出 OutOfMemery)
2) CachedThreadPool(缓存线程池) 和 newScheduledThreadPool(定时线程池)
允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM(内存溢出 OutOfMemery)
所以《 阿里巴巴开发手册》强制不允许使用Executors 创建线程池,而是用ThreadPoolExecutor创建
正确的线程池创建方法应该是:
自己指定 各参数的值,特别是 maximumPoolSize 的大小和 blockingQueue的长度,防止造成OOM,示例如下
//线程池的核心池大小 int corePoolSize = 10; //线程池允许的最大线程数 int maximumPoolSize = 20; //表示线程没有任务时最多保持多久然后停止 long keepAliveTime = 60L; //keepAliveTime 的单位 TimeUnit timeUnit = TimeUnit.SECONDS; //任务队列 BlockingQueue<Runnable> blockingQueue = new LinkedBlockingDeque<>(50); ExecutorService excutorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit,blockingQueue);
线程池的调用方式
//线程池内加入第一个任务 excutorService.execute(new ThreadPoolTest()) //线程池内加入第二个任务 excutorService.execute(new ThreadPoolTest())、 //线程池内加入第三个任务 excutorService.execute(new ThreadPoolTest()) /** * 线程池测试类 * @author hup * @since 2019-11-05 22:25 */ public class ThreadPoolTest implements Runnable { /** * 实现run 方法 */ @Override public void run() { try { Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "线程被调用了"); } catch (Exception e) { e.printStackTrace(); } } }
四 线程任务提交给线程池的方法,submit()和execute()的区别
1 入参类型不同
excute() 入参Runnable
submit() 入参可以为Callable,也可以为Runnable。
2 是否有返回值
submit() 方法,可以提供Future < T > 类型的返回值。
executor() 方法,无返回值。
execute()方法源码
public void execute(Runnable command) { if (command == null) throw new NullPointerException();//抛掉异常 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
submit有Future返回值 :
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
3 是否抛出异常
excute() 方法会抛出异常。
sumbit() 方法不会抛出异常。除非你调用Future.get()。
------------------------------------------------ end--------------------------------------------------