自定义线程池
BlockingQueue用于存放任务。
线程池中的所有线程一直都是运行状态的,线程的空闲只是代表此刻它没有在执行任务而已;运行中的线程,一旦没有执行任务时,就自己从队列中取任务来执行。
class ThreadExcutor{
//创建
private volatile boolean RUNNING = true;
//所有任务都放队列中,让工作线程来消费
private static BlockingQueue<Runnable> queue = null;
private final HashSet<Worker> workers = new HashSet<Worker>();
private final List<Thread> threadList = new ArrayList<Thread>();
//工作线程数
int poolSize = 0;
//核心线程数(创建了多少个工作线程)
int coreSize = 0;
boolean shutdown = false;
public ThreadExcutor(int poolSize){
this.poolSize = poolSize;
queue = new LinkedBlockingQueue<Runnable>(poolSize);
}
public void exec(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
if(coreSize < poolSize){
addThread(runnable);
}else{
//System.out.println("offer" + runnable.toString() + " " + queue.size());
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void addThread(Runnable runnable){
coreSize ++;
Worker worker = new Worker(runnable);
workers.add(worker);
Thread t = new Thread(worker);
threadList.add(t);
try {
t.start();
}catch (Exception e){
e.printStackTrace();
}
}
public void shutdown() {
RUNNING = false;
if(!workers.isEmpty()){
for (Worker worker : workers){
worker.interruptIfIdle();
}
}
shutdown = true;
Thread.currentThread().interrupt();
}
//这里留个位置放内部类Worker
}
/**
* 工作线程
*/
class Worker implements Runnable{
public Worker(Runnable runnable){
queue.offer(runnable);
}
@Override
public void run() {
while (true && RUNNING){
if(shutdown == true){
Thread.interrupted();
}
Runnable task = null;
try {
task = getTask();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public Runnable getTask() throws InterruptedException {
return queue.take();
}
public void interruptIfIdle() {
for (Thread thread :threadList) {
System.out.println(thread.getName() + " interrupt");
thread.interrupt();
}
}
}
线程池的使用
ExecutorService pool = Executors.newCachedThreadPool();
// 查询客户资产
Runnable tr1 = new Mythread1(map);
// 发送消息通知
Runnable tr2 = new Mythread2(map);
pool.execute(tr1);
pool.execute(tr2);
try
{
pool.shutdown();
}
catch (Exception e)
{
logger.error("关闭出現异常", e);
pool.shutdownNow();
}
shutDown()和shutdownNow()的区别?
shutDown()
当线程池调用该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
shutdownNow()
执行该方法,线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务。即,正在执行的任务则被停止,没被执行任务的则返回。
它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
几种常见线程池
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
//单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务
//线程池中只有一个线程进行任务执行,其他的都放入阻塞队列
//外面包装的FinalizableDelegatedExecutorService类实现了finalize方法,在JVM垃圾回收的时候会关闭线程池
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newFixedThreadExecutor(n)
public static ExecutorService newFixedThreadPool(int nThreads) {
//固定数量的线程池,每提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行
//corePoolSize跟maximumPoolSize值一样,同时传入一个无界阻塞队列
//根据上面分析的woker回收逻辑,该线程池的线程会维持在指定线程数,不会进行回收
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newCacheThreadExecutor(推荐使用)
public static ExecutorService newCachedThreadPool() {
//可缓存线程池,当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能的添加新线程来执行
//这个线程池corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,意思也就是说来一个任务就创建一个woker,回收时间是60s
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
线程池用于解决什么问题
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
如果任务是IO密集型,一般线程数需要设置2倍CPU数以上,以此来尽量利用CPU资源。
如果任务是CPU密集型,一般线程数量只需要设置CPU数加1即可,更多的线程数也只能增加上下文切换,不能增加CPU利用率。