关于线程池的源码分析,在这里也没认真说明,后面单独起一片文章进行研究。
1.大纲
线程池介绍
创建与停止线程池
常见的线程池特点与用法
任务太多,怎么拒绝
钩子方法
实现原理,源码分析
使用线程池的主要点
一:介绍
1.重要性
使用中重要
面试中重要
2.池
线程可以复用
可以控制资源的总量
3.不使用线程池些的程序
这里有两个程序,只粘贴进行循环对每个任务进行创建线程,并执行
package com.jun.juc.threadpool; /** * for循环执行每一个任务的线程 * 可以正常的执行,但是有些问题 * 开销大,反复的操作系统进行创建与销毁 */ public class ForLoop { public static void main(String[] args) { for (int i = 0; i< 10000; i++){ Thread thread = new Thread(new Task()); thread.start(); } } static class Task implements Runnable{ @Override public void run() { System.out.println("执行了任务"); } } }
4.为什么使用线程池
反复的创建,开销大
让一部分的线程保持工作,反复的执行任务
过多的线程会占用太多的内存
使用少量的线程
5.线程池的好处
加快响应速度
更好的利用CPU,与内存。选择合适的线程数
统一管理
6.使用场景
服务器接收大量的请求
多个线程的创建
二:创建与停止线程池
1.线程池的构造函数的参数
corePoolSize:核心线程数,int
maxPoolSize:最大的线程数,int
keepAliveTime:存活时间,long
workQueue:任务存储队列,BlockingQueue
threadFactory:工厂类,ThreadFactory
Handler:拒绝策略,RejectedExecutionHandler
2.corePoolSize
线程池进行初始化的时候,线程池里没有任何的线程,线程池会等待有任务到来的时候,再进行创建新线程执行任务
3.macPoolSize
线程池有可能子啊核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一定的上限,这个就是最大量
如果超过了corePoolSize的时候,先将任务放到队列中。
队列中满了,才会去看
4.添加线程的规则
如果线程小于corePoolSize的时候,即使线程有处于空闲状态,也会继续创建新的线程运行新的任务
如果等于大于corePoolSize,但是小于maxPoolSize,放入队列
如果队列已满,并且线程小于maxPoolSize,创建新的线程
5.keepAliveTime
主要看是控制的是谁。
如果线程池的当前的线程数多于了corePoolSize,那么多于的线程空闲时间超过keepAliveTime,将会被终止
减少资源消耗
6.ThreadFactory
新的线程默认使用Exectors.defaultThreadFactory(),创建的线程都在一个线程组,拥有相同的NORM_PRIORITY优先级,并且都不是守护线程
7.workQueue
工作队列
最常见的队列类型:
直接交换:SynchronousQueue,内部没有容量
无界队列:LinkedBlockingQueue
有界队列:ArrayBlockingQueue
三:常见的线程池
1.FixedThreadPool创建
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
可以看见,corePoolSize与maxPoolSize是相等的
然后使用的是无界队列。
由于传递进去的任务是没有容量上限的,可能占用大量的内存,出现OOM
2.演示溢出
package com.jun.juc.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * FixedThreadPool的使用场景 */ public class FixedThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i=0; i<Integer.MAX_VALUE;i++){ executorService.execute(new Task()); } } static class Task implements Runnable{ @Override public void run() { try { // 让这个任务执行的很慢,表示队列中会一直增加 Thread.sleep(500000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行了任务"); } } }
效果:
D:jdk1.8.0_144injava -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:63206,suspend=y,server=n -Xmx8m -Xms8m -Dfile.encoding=UTF-8 -classpath "D:jdk1.8.0_144jrelibcharsets.jar;D:jdk1.8.0_144j Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371) at com.jun.juc.threadpool.FixedThreadPoolTest.main(FixedThreadPoolTest.java:13)
3.SingleThreadExector的使用
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
说明:
相比于FixedThreadPool,只是corePoolSize与macPoolSize都是1,其他不变
4.也会出现OOM
package com.jun.juc.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SingleThreadExectos { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i=0; i<Integer.MAX_VALUE;i++){ executorService.execute(new FixedThreadPoolTest.Task()); } } static class Task implements Runnable{ @Override public void run() { try { // 让这个任务执行的很慢,表示队列中会一直增加 Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行了任务"); } } }
5.CachedThreadPool
可缓存线程池,无界的线程池,可以自动回收多于线程的功能
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
其中,最大的线程数没有限制,也是一个大的弊端。如果任务量过大,一样会出现的是OOM
6.测试
package com.jun.juc.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CachedThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i=0; i< 1000; i++){ executorService.execute(new Task()); } } static class Task implements Runnable{ @Override public void run() { try { // 让这个任务执行的很慢,表示队列中会一直增加 Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread()+"执行了任务"); } } }
效果:
Thread[pool-1-thread-495,5,main]执行了任务 Thread[pool-1-thread-491,5,main]执行了任务 Thread[pool-1-thread-511,5,main]执行了任务 Thread[pool-1-thread-508,5,main]执行了任务 Thread[pool-1-thread-507,5,main]执行了任务 Thread[pool-1-thread-499,5,main]执行了任务 Thread[pool-1-thread-519,5,main]执行了任务 Thread[pool-1-thread-522,5,main]执行了任务 Thread[pool-1-thread-518,5,main]执行了任务 Thread[pool-1-thread-510,5,main]执行了任务
创建了很多的线程
7.ScheduledThreadPool
支持定时与周期性的执行的线程池
核心线程是传递过去的,但是最大的核心线程数是INTEGER.MAX_VALUE
8.延迟一定时间之后运行
定时
package com.jun.juc.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheledThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.schedule(new Task(),6, TimeUnit.SECONDS); } static class Task implements Runnable{ @Override public void run() { try { // 让这个任务执行的很慢,表示队列中会一直增加 Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread()+"执行了任务"); } } }
9.周期性的运行
周期
package com.jun.juc.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheledThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.schedule(new Task(),6, TimeUnit.SECONDS); // 周期的运行 scheduledExecutorService.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS); } static class Task implements Runnable{ @Override public void run() { try { // 让这个任务执行的很慢,表示队列中会一直增加 Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread()+"执行了任务"); } } }
10.总结
主要注意的是CacheDThreadPool与ScheduleThreadPool的对比。
CachedThreadPool为啥使用SynchronousQueue,因为有任务不需要进行存储,直接交给线程执行就行了。
ScheduledThreadPool使用的是延迟队列DelayedWorkQueue
四:线程池中的线程数量设定
1.计算密集型的
为cpu核心数的1~2倍
2.耗时IO型的
最佳线程数一般大于cpu很多倍。
以jvm线程监控显示繁忙情况为依据,参考brain goetz推荐的计算方法
3.计算方法
cpu核心数 * (1+平均等待时间/平均工作时间)
五:停止线程池
1.shutdown
要线程中,会队列中的线程任务都执行完成后,再进行停止
对拒绝新的任务
2.测试
package com.jun.juc.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * shutdown关闭 */ public class ShutDown { public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i=0; i< 1000; i++){ executorService.execute(new ShutDownTask()); } Thread.sleep(2000); executorService.shutdown(); executorService.execute(new ShutDownTask()); } static class ShutDownTask implements Runnable { @Override public void run(){ try { Thread.sleep(50); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
效果:
可以发现,在执行一段时间后,就可以发现,真的不再进行接收任务了
pool-1-thread-1 pool-1-thread-1 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.jun.juc.threadpool.ShutDown$ShutDownTask@ed17bee rejected from java.util.concurrent.ThreadPoolExecutor@2a33fae0[Shutting down, pool size = 1, active threads = 1, queued tasks = 960, completed tasks = 39] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) at com.jun.juc.threadpool.ShutDown.main(ShutDown.java:17) pool-1-thread-1 pool-1-thread-1
3.isShutdown
可以知道线程被停止过了
package com.jun.juc.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * shutdown关闭 */ public class ShutDown { public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i=0; i< 100; i++){ executorService.execute(new ShutDownTask()); } Thread.sleep(2000); System.out.println(executorService.isShutdown()); executorService.shutdown(); System.out.println(executorService.isShutdown()); executorService.execute(new ShutDownTask()); } static class ShutDownTask implements Runnable { @Override public void run(){ try { Thread.sleep(50); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
效果:
先false,然后true
4.isTerminated
返回是否真正的结束
package com.jun.juc.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * shutdown关闭 */ public class ShutDown { public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i=0; i< 10; i++){ executorService.execute(new ShutDownTask()); } System.out.println(executorService.isShutdown()); executorService.shutdown(); System.out.println(executorService.isShutdown()); Thread.sleep(2000); System.out.println(executorService.isTerminated()); } static class ShutDownTask implements Runnable { @Override public void run(){ try { Thread.sleep(50); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
效果:
D:jdk1.8.0_144injava -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:62572,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath "D:jdk1.8.0_144jrelibcharsets.jar;D:jdk1.8.0_144jrelibdeploy.jarConnected to the target VM, addre true pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 Disconnected from the target VM, address: '127.0.0.1:62572', transport: 'socket' true Process finished with exit code 0
5.awaitTermination
所有的任务都执行完毕,等待的时间到了,等待过程中被打断都会返回,否则阻塞。
说明:8秒内,如果关闭了线程,并且都执行完成返回true,否则是false
package com.jun.juc.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * shutdown关闭 */ public class ShutDown { public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i=0; i< 10; i++){ executorService.execute(new ShutDownTask()); } executorService.shutdown(); boolean b = executorService.awaitTermination(8L, TimeUnit.SECONDS); System.out.println("b="+b); } static class ShutDownTask implements Runnable { @Override public void run(){ try { Thread.sleep(50); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
6.shutdownNow
立刻关闭线程
存在返回未执行的任务。
package com.jun.juc.threadpool;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* shutdown关闭
*/
public class ShutDown {
public static void main(String[] args) throws Exception{
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i=0; i< 10; i++){
executorService.execute(new ShutDownTask());
}
// shutdownNow
List<Runnable> runnables = executorService.shutdownNow();
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
runnables.forEach(item -> executorService2.execute(item));
}
static class ShutDownTask implements Runnable {
@Override
public void run(){
try {
Thread.sleep(50);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println("线程终端了");
}
}
}
}
六:拒绝策略
1.拒绝时机
Executor关闭时
最大线程和队列已满
2.拒绝策略
AbsortPolicy:直接抛出异常
DiscradPolicy:默默丢弃
DiscardOldestPolicy:丢弃最老的任务
CallerRunsPolicy:谁提交的任务,则有谁进行运行,这样可以降低提交速度
七:钩子方法
1.说明
在任务的前后
日志,统计
2.暂停线程池
package com.jun.juc.threadpool; import java.util.concurrent.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class PauseableThreadPool extends ThreadPoolExecutor { /** * 并发加锁 */ private final ReentrantLock lock = new ReentrantLock(); private Condition unpaused = lock.newCondition(); /** * 是否暂停 */ private boolean isPaused; public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } /** * 执行之前,暂停 * * @param t * @param r */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); lock.lock(); try { while (isPaused) { unpaused.await(); } } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } /** * 暂停 */ private void pause() { lock.lock(); try { isPaused = true; } finally { lock.unlock(); } } /** * 恢复 */ public void resume(){ lock.lock(); try { isPaused = false; unpaused.signalAll(); }finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); Runnable task = new Runnable(){ @Override public void run() { System.out.println("开始执行了"); try { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }catch (Exception e){ e.printStackTrace(); } } }; for(int i=0; i<1000; i++){ pauseableThreadPool.execute(task); } Thread.sleep(1000); pauseableThreadPool.pause(); System.out.println("线程池被暂停了"); Thread.sleep(1000); pauseableThreadPool.resume(); System.out.println("线程池被再次执行了"); } }
效果:
开始执行了
开始执行了
线程池被暂停了
线程池被再次执行了
开始执行了
开始执行了
八:源码
1.组成部分
线程池管理器
工作线程
任务队列
任务接口
2.Exector家族
3.Exector
顶层接口,只有一个方法
* @since 1.5 * @author Doug Lea */ public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
4.ExecuorService
继承了Excetor,然后增加了几个新的方法
初步的有了管理线程池的方法
5.Excetors
这是一个工具类
进入可以发现是使用ThreadPoolExector进行创建的线程
6.线程池实现任务复用的原理
execute:
添加到worker
進行运行:
九:线程池状态
1.线程池状态
Running:接收新任务并处理排队任务
SHUTDOWN:不接受新任务,但是处理排队任务
stop:不接受新任务,也不处理排队任务,并中断正在进行的任务
tidying:所有的任务都已经终结,workerCount为零时,线程就会转为这个状态,并且运行terminate()方法
TERMIMATED:运行完成
2.状态值