参考内容:
<码出高效>
https://github.com/CyC2018/CS-Notes/blob/master/notes/Java 并发.md#新建new
需求场景
当我们使用多线程进行操作时, 需要一个工具对同一种类或者不同种类的批量线程进行统一的管理 ,同时 也可以减少每次都新建 销毁线程的系统资源消耗问题.就出现了线程池的概念
线程池的优点
- 可以对线程进行统一的管理,提高资源利用率从
- 可以对线程进行隔离,比如耗时长的线程放到一个池子中,耗时短的线程放到一个池子中.方便控制
- 实现自己的 线程缓存队列 和拒绝机制
- 执行定时任务
ThreadPoolFactory解析
主要记录了相关参数,这里没有对源码进行剖析
1. int corePoolSize 核心线程数,所有线程执行结束 核心线程也不会销毁
2. int MaxnumPoolSize 线程池存储的最大线程数,当大于核心线程数时,判断keepAloveTime进行销毁,当超过此数值,则需要 线程缓存队列进行缓存
3. long keepAliveTime 由于判断时间
4. TimeUnit unit 判断时间
5. BlockingQueue<runnable> workQueue 线程缓存队列
6. ThreadFactory threadFactory 生成线程的工厂
7. RejectExcutionHandler handler 执行拒绝策略的对象
当线程放入线程池后,会先判断是否小于核心线程数小于的话直接创建线程,如果超过核心线程数,会有限放入队列,队列也满了的话,才会判断最大线程数目,看是新建线程还是执行拒绝机制.
Excutor自带的五种线程池工具
newCachedThreadPool() 可弹性伸缩的线程池
newFixThreadPool() 指定大小的线程池 核心线程数 等于 最大线程数,永远保持一定数量
newSingleThreadPool() 单线程池,核心和最大线程数为1,每次只能执行一个线程
newsheduledThreadPool() 也是可弹性伸缩的线程,但是最大线程数为INT最大值,即不回收线程
newWorkStealingPool()
实现自定义线程池
在实际使用中最好使用自己定义的线程池,保证可控状态
package concurrency;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/*
* 自定义线程工厂
* */
class UserThreadFactory implements ThreadFactory {
private final String threadName;
private final AtomicInteger nextId = new AtomicInteger(1);
//1. 通过构造函数传入名称
public UserThreadFactory(String threadName) {
this.threadName = "userFactory-" + threadName + "-worker-";
}
@Override
public Thread newThread(Runnable r) {
//2. 为传入的线程指定名字
String name = threadName + nextId.getAndIncrement();
Thread thread = new Thread(r, name);
System.out.println(thread.getName());
return thread;
}
}
/*
* 创建自己的拒绝机制处理器
* */
class UserRejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("reject user thread : " + executor.toString());
}
}
/*
*
* 模拟创建一个线程任务
* */
class ThreadTask implements Runnable {
private final AtomicLong id = new AtomicLong(0L);
@Override
public void run() {
// try {
// Thread.sleep(1100);
// }catch (InterruptedException e){
// e.printStackTrace();
// }
System.out.println("running : " + id.getAndIncrement());
}
}
public class NO2ThreadPool {
public static void main(String[] args) {
//自己线程工厂
ThreadFactory userThreadfactory = new UserThreadFactory("user1");
ThreadFactory userThreadfactory2 = new UserThreadFactory("user2");
//自己的拒绝机制处理器
RejectedExecutionHandler userHandler = new UserRejectHandler();
//指定自己的缓存队列
BlockingDeque blockingDeque = new LinkedBlockingDeque(10);
// 2 6 4 10 6 14
/* public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
4,
60,
TimeUnit.SECONDS,
blockingDeque,
userThreadfactory,
userHandler
);
ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(2,
4,
60,
TimeUnit.SECONDS,
blockingDeque,
userThreadfactory2,
userHandler
);
for (int i = 0; i < 20; i++) {
threadPoolExecutor.execute(new ThreadTask());
// threadPoolExecutor2.execute(new ThreadTask());
}
}
/*
* userFactory-user1-worker-1
userFactory-user1-worker-2
userFactory-user1-worker-3
userFactory-user1-worker-4
创建了四个线程数 小于 maxnumthread值
20个线程 拒绝了 6个 , 20 - 最大线程数 - 缓存队列数
reject user thread : java.util.concurrent.ThreadPoolExecutor@4d7e1886[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0]
reject user thread : java.util.concurrent.ThreadPoolExecutor@4d7e1886[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0]
reject user thread : java.util.concurrent.ThreadPoolExecutor@4d7e1886[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0]
reject user thread : java.util.concurrent.ThreadPoolExecutor@4d7e1886[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0]
reject user thread : java.util.concurrent.ThreadPoolExecutor@4d7e1886[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0]
reject user thread : java.util.concurrent.ThreadPoolExecutor@4d7e1886[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0]
//线程执行了16个
running : 0
running : 0
running : 0
running : 0
running : 0
running : 0
running : 0
running : 0
running : 0
running : 0
running : 0
running : 0
running : 0
running : 0*/
}
submit execute
ExecutorService支持两种方法添加执行现场 submit execute, 区别是submit在执行实现callable的接口时,会获取返回结果.
package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
public class NO21ThreadPool {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
service.submit(new Runnable() {
@Override
public void run() {
System.out.println("runable1_" + Thread.currentThread().getName());
}
});
service.execute(new Runnable() {
@Override
public void run() {
System.out.println("runable2_" + Thread.currentThread().getName());
}
});
}
ExecutorService service1 = Executors.newCachedThreadPool();
List list = new ArrayList();
for (int i = 0; i < 5; i++) {
Future future = service1.submit(new Callable() {
@Override
public Object call() throws Exception {
return 1;
}
});
try {
list.add(future.get());
}catch (Exception e){
e.printStackTrace();
}
}
for (Object n : list){
System.out.println(n.toString());
}
}
/*
* runable2_pool-1-thread-2
runable1_pool-1-thread-3
runable2_pool-1-thread-4
runable2_pool-1-thread-6
runable1_pool-1-thread-7
runable1_pool-1-thread-5
runable2_pool-1-thread-8
runable1_pool-1-thread-9
runable2_pool-1-thread-10
1
1
1
1
1*/
}