Executor
Callable Runnable + ret
异步
future
futuretask Future + Runnable
CompletableFuture(任务管理类)
提供链式处理方式
allOf
一堆任务的管理
anyOf
submit
task
ThreadPoolExecutor
ForkJoinPool
线程池维护两个集合
线程
任务
各种参数
Executors线程池的工厂
SingleThreadExecutor
线程池
线程池分类#
CachedThreadPool
- 创建一个可缓存线程池,如果线程池长度超过处理需要
- 可灵活回收空闲线程,(回收默认时间为1min)
- 若无可回收,则新建线程
- 线程池为无限大(Integer.MAX_VALUE)
- 当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程
FixedThreadPool
- 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
- 定长线程池的大小最好根据系统资源进行设置
SingleThreadScheduledExecutor
- 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
ScheduledThreadPool#
- 给定延时之后的运行任务或定期处理任务
ForkJoinPool,jdk1.7以后的新线程池,分治算法,计算向数据移动
将大任务切分成小任务执行,然后汇总
用很少的线程执行很多的任务,TPE做不到先执行子任务
CPU密集型
ForkJoinTask
ForkJoinTask的子类RecursiveAction
RecursiveTask
parallelStream也是用的ForkJoinPool
fork,一个进程拆分成两个进程
大任务拆分成小任务,然后汇总#
WorkStealingPool#
每一个线程都有自己单独的队列
执行完自己任务会从其他线程上面偷取任务执行
拒绝策略#
生命周期
- RUNNING:能接受新提交的任务,并且也能处理阻塞队列中的任务
- SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
- STOP:不能接受新任务,也不能处理队列中的任务,会中断正在处理任务的线程
- TIDYING:如果所有任务都终止了,workerCount(有效线程数)为0,线程池进入该状态后调用terminated()方法进入Terminated状态
- TERMINATED:在terminated方法执行完成后进入该状态,默认terminated方法中什么也没有做
ThreadPoolExecutor 参数#
public TreadPoolExecutor( int corePoolSize, // 核心线程池数量 int maximumPoolSize, // 最大线程池数量 long keepAliveTime, // 临时线程的生命周期 TimeUnit unit, // 临时线程的生命周期单位 BlockQueue<Runnable> workQueue, // 阻塞队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler){ // 饱和策略(拒绝策略) }
- ArrayBlockingQueue,基于数组,定长数组,缓存队列的数据对象,
- LinkedBlockingQueue,基于链表,同ArrayListBlockingQueue类似
- DelayQueue,只有当指定的延迟时间到了,才能够从队列中获取该元素
- PriorityBlockingQueue,基于优先级的阻塞队列,不会阻塞数据生产者
- SynchronousQueue,一种无缓冲的等待队列,类似于无中介的直接交易
arrayblockingqueue和linkedblockqueue的区别
- 队列中的锁不同
- 队列大小初始化方式不同
拒绝策略#
- ThreadPollExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
- ThreadPollExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃对立最前面的任务,然后重新尝试执行任务
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
Execute方法执行逻辑
#
- 如果当前运行的线程少于corePoolSize,则会创建新得线程来执行新的任务
- 如果运行的线程个数等于或者大于corePoolSize,则会将提交的任务存放到阻塞队列workQueue中
- 如果当前workQueue队列已满的话,则会创建新的线程来执行任务
- 如果线程个数已经超过了maximumPoolSize,使用饱和策略RejectedExecutionHandler来进行处理
Excutor和Submit#
submit是基于方法Executor.execute(Runnable)的延申,通过创建并返回一个Future类对象,取消执行或等待完成
线程池的关闭#
关闭线程,可以通过shutDown和shutDownNow两个方法
遍历线程池中所有的线程,然后依次中断
shutdownNow首先将线程池的状态设置为STOP,然后尝试停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表
shutdown只是将线程池的状态设置未SHUTDOWN状态,然后中断所有没有正在执行任务的线程
Worker类
单独的线程类,执行自己的任务
execute
1 core queue noncore 核心线程
addWorker
count++
addworker start