一、什么是线程池: java.util.concurrent.Executors提供了一个 java.util.concurrent.Executor接口的实现用于创建线程池
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
一个线程池包括以下四个基本组成部分:
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。
它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
二.常见的几种线程池
1. newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
2.newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
3. newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,
那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
4.newScheduledThreadPool
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
三. 线程池的作用:
线程池的作用就是限制系统中执行线程的数量。
根据系统的环境情况,可以自动或手动设置线程的数量,达到运行的最佳效果;少了浪费系统资源,多了造成系统拥挤而效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列中取最前面的任务开始执行。若队列中没有的等待进程,线程池的这一资源处于等待状态。当一个新任务需要运行时,如果线程池中有等待的线程,则可以直接运行,否则进入等待队列。
四 .为什么要用线程池:
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
五.关于线程池之ThreadPoolExecutor使用
四个构造方法
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
最后,对参数最多的那个 对其参数进行解释
1 public ThreadPoolExecutor(int corePoolSize, // 1 2 int maximumPoolSize, // 2 3 long keepAliveTime, // 3 4 TimeUnit unit, // 4 5 BlockingQueue<Runnable> workQueue, // 5 6 ThreadFactory threadFactory, // 6 7 RejectedExecutionHandler handler ) { //7 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
序号 | 名称 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程池大小 |
2 | maximumPoolSize | int | 最大线程池大小 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 时间单位 |
5 | workQueue | BlockingQueue<Runnable> | 线程等待队列 |
6 | threadFactory | ThreadFactory | 线程创建工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
二、自定义线程池
以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:
1 public class ThreadTest { 2 3 public static void main(String[] args) throws InterruptedException, IOException { 4 int corePoolSize = 2; 5 int maximumPoolSize = 4; 6 long keepAliveTime = 10; 7 TimeUnit unit = TimeUnit.SECONDS; 8 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2); 9 ThreadFactory threadFactory = new NameTreadFactory(); 10 RejectedExecutionHandler handler = new MyIgnorePolicy(); 11 ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, 12 workQueue, threadFactory, handler); 13 executor.prestartAllCoreThreads(); // 预启动所有核心线程 14 15 for (int i = 1; i <= 10; i++) { 16 MyTask task = new MyTask(String.valueOf(i)); 17 executor.execute(task); 18 } 19 20 System.in.read(); //阻塞主线程 21 } 22 23 static class NameTreadFactory implements ThreadFactory { 24 25 private final AtomicInteger mThreadNum = new AtomicInteger(1); 26 27 @Override 28 public Thread newThread(Runnable r) { 29 Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement()); 30 System.out.println(t.getName() + " has been created"); 31 return t; 32 } 33 } 34 35 public static class MyIgnorePolicy implements RejectedExecutionHandler { 36 37 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 38 doLog(r, e); 39 } 40 41 private void doLog(Runnable r, ThreadPoolExecutor e) { 42 // 可做日志记录等 43 System.err.println( r.toString() + " rejected"); 44 // System.out.println("completedTaskCount: " + e.getCompletedTaskCount()); 45 } 46 } 47 48 static class MyTask implements Runnable { 49 private String name; 50 51 public MyTask(String name) { 52 this.name = name; 53 } 54 55 @Override 56 public void run() { 57 try { 58 System.out.println(this.toString() + " is running!"); 59 Thread.sleep(3000); //让任务执行慢点 60 } catch (InterruptedException e) { 61 e.printStackTrace(); 62 } 63 } 64 65 public String getName() { 66 return name; 67 } 68 69 @Override 70 public String toString() { 71 return "MyTask [name=" + name + "]"; 72 } 73 } 74 }
今天先到这里,突然感觉很有危机感。