线程池简单基础介绍:
Executor:
Executor是Java工具类,执行提交给它的Runnable任务。该接口提供了一种基于任务运行机制的任务提交方法,包括线程使用详细信息,时序等等。Executor通常用于替代创建多线程。
提供一个execute(Runnable command)方法;我们一般用它的继承接口ExecutorService。里面就只有一个执行任务的接口,源码如下:
public interface Executor {
void execute(Runnable command);
}
ExecutorService:
它是线程池定义的一个接口,继承自Executor。有两个实现类,分别为ThreadPoolExecutor,ScheduledThreadPoolExecutor。除了继承自父类的 execute 执行方法 ,自己还定义了一系列方法,其中有两个重载提交任务的方法 submit 方法,参数可以是 Runnable 或者是 Callable类型的。
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
这里既然提到了Callable 那就先来看一下这了接口跟Runnable:
Callable:
public interface Callable<V> {
V call() throws Exception;
}
Runnable:
public interface Runnable {
public abstract void run();
}
相同点:
- 两者都是接口;
- 两者都可用来编写多线程程序;
- 两者都需要调用Thread.start()启动线程;
不同点:
- 两者最大的不同点是:实现Callable接口的任务线程能返回执行结果;而实现Runnable接口的任务线程不能返回结果;
- Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;
注意点:
Callable接口支持返回执行结果,此时需要调用FutureTask.get()方法实现,此方法会阻塞主线程直到获取‘将来’结果;当不调用此方法时,主线程不会阻塞!
Executors:
是java.util.concurrent包下的一个类,提供了若干个静态方法,用于生成不同类型的线程池的工具类,有点类似与Arrays。Executors一共可以创建下面这四类线程池:
newCacheThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
newSingleThreadExecutor 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
ThreadPool:
合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。
线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。用线程池来管理的好处是,可以保证系统稳定运行,适用与有大量线程,高工作量的情景下使用,假如要展示1000张图片如果创建1000个线程去加载,系统肯定会死掉。用线程池就可以避免这个问题,可以用5个线程轮流执行,5个一组,执行完的线程不直接回收而是等待下次执行,这样对系统的开销就可以减小不少。
public class T05_ThreadPool {
public static void main(String[] args) {
//创建一个5个线程的线程池
ExecutorService service = Executors.newFixedThreadPool(5);
for(int i=0;i<6;i++) {//往池子里仍了6个任务
service.execute(()->{//睡500毫秒后打印线程名
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
//java.util.concurrent.ThreadPoolExecutor@119d7047[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
System.out.println(service);
service.shutdown();//关闭线程池 等待任务都执行完再关闭
System.out.println(service.isTerminated());//false 任务是否都执行完
System.out.println(service.isShutdown());//true 是不是关闭? 关闭了不代表任务执行完。
//java.util.concurrent.ThreadPoolExecutor@119d7047[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
System.out.println(service);
try {
TimeUnit.SECONDS.sleep(5);//睡5秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(service.isTerminated());//true
System.out.println(service.isShutdown());//true
//java.util.concurrent.ThreadPoolExecutor@119d7047[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
System.out.println(service);
}
}
Futrue:
public class T06_Future {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//FutureTask 区分 RunnableTask 也是实现了Runnable接口的
FutureTask<Integer> task = new FutureTask<>(()-> {
try {// 该任务将来会有个返回值是Integer类型
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1000;
}); // new Callable()
new Thread(task).start();
System.out.println(task.get());//阻塞
//************************************
ExecutorService service = Executors.newFixedThreadPool(5);
Future<Integer> f = service.submit(()->{// callable
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
System.out.println(f.isDone());// 任务执行完没有啊?
System.out.println(f.get()); // 阻塞 1
System.out.println(f.isDone());
}
}
ParallerComputing(并行计算)
newFixedThreadPool:(固定线程的线程池)
测试例子:计算1-20W之间的质数数量,用一个线程的话计算时间会很长。我们可以用线程池来解决:
public class T07_ParallerComputing {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long start = System.currentTimeMillis();
// 获取 1-200000的质数,只能被1跟自身整除
List<Integer> results = getPrime(1,200000);
long end = System.currentTimeMillis();
System.out.println(end - start);// 2000 左右
final int cpuCoreNum =4;
ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
MyTask task1 = new MyTask(1, 80000);
MyTask task2 = new MyTask(80001, 130000);
MyTask task3 = new MyTask(130001, 170000);
MyTask task4 = new MyTask(170001, 200000);
start = System.currentTimeMillis();
Future<List<Integer>> submit1 = service.submit(task1);
Future<List<Integer>> submit2 = service.submit(task2);
Future<List<Integer>> submit3 = service.submit(task3);
Future<List<Integer>> submit4 = service.submit(task4);
submit1.get();
submit2.get();
submit3.get();
submit4.get();
end = System.currentTimeMillis();
System.out.println(end - start); // 800左右
}
static class MyTask implements Callable<List<Integer>> {
int startPos, endPos;
private MyTask(int startPos, int endPos) {
this.startPos = startPos;
this.endPos = endPos;
}
@Override
public List<Integer> call() throws Exception {
List<Integer> results = getPrime(startPos, endPos);
return results;
}
}
static boolean isPrime(int num) {// 是否是质数
for (int i = 2; i < num / 2; i++) {
if (num % i == 0)
return false;
}
return true;
}
static List<Integer> getPrime(int start, int end) {
List<Integer> results = new ArrayList<Integer>();
for (int i = start; i < end; i++) {
if (isPrime(i))
results.add(i);
}
return results;
}
}
两次执行结果大致为 2067 ,698 .这说明用线程池会大大提高计算效率。
newCachedThreadPool:
刚刚开始线程池里面没有线程,来一个任务启动一个线程,如果有空闲的线程就直接执行任务,没有空闲就另起一个线程,每个线程超过 60 秒的空闲时间,线程消失,可以自己指定生存时间。
public class T08_CachedPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
//java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
System.out.println(service);
for(int i=0;i<2;i++) {
service.execute(()->{//睡500毫秒后打印线程名
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
//java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
System.out.println(service);
TimeUnit.SECONDS.sleep(80);
//java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
System.out.println(service);
}
}
newSingleThreadExecutor:
线程池里面只有一个线程,代码如下:
public class T09_SingleThreadPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int j = i;
service.execute(() -> {
System.out.println(j + " " + Thread.currentThread().getName());
});
// 0 pool-1-thread-1
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// 3 pool-1-thread-1
// 4 pool-1-thread-1
}
}
}
这个可以保证任务的先后执行顺序,打印出来的结果是按顺序的,且只有一个线程去执行。
newScheduledThreadPool:定时器线程池
以下小程序是启动后0秒开始执行,每隔500毫秒执行一次任务
public class T09_SingleThreadPool {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.scheduleAtFixedRate(()->{//以固定的频率来执行任务
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, 0, 500, TimeUnit.MILLISECONDS);// 起始延迟多久后执行 ,每隔500毫秒执行一次,时间单位
}
}
newWorkStealingPool:精灵线程(守护线程,后台线程)
任务窃取:线程池中每个线程都维护着自己的任务队列,当某一个线程队列执行空了,他会去另外的线程中去拿一个任务来执行,不用去分配:
public class T11_WorkStealingPool {
public static void main(String[] args) throws InterruptedException, IOException {
ExecutorService service = Executors.newWorkStealingPool();
//查看CPU是几核 我这里是8 默认启动8个线程java.util.concurrent.ForkJoinPool@55f96302[Running, parallelism = 8, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
System.out.println(Runtime.getRuntime().availableProcessors());
//java.util.concurrent.ForkJoinPool@55f96302[Running, parallelism = 8, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
System.out.println(service);
service.execute(new R(1000));
service.execute(new R(1000));
service.execute(new R(1000));
service.execute(new R(1000));//daemon 精灵线程Debug 可以查看
service.execute(new R(1000));
//由于产生的是精灵线程(守护线程,后台线程),主线程不阻塞的话看不到输出
System.in.read();
}
static class R implements Runnable{
int time;
public R(int time) {
this.time =time;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time +" "+ Thread.currentThread().getName());
}
}
}
newWorkStealingPool的实现是ForkJoinPool;
ForkJoinPool :
ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。
下面例子是从 0 加到 100W .:
public class T12_ForkJoinPool {
static int[] nums= new int[1000000];
static final int MAX_NUM =50000;
static Random r=new Random();
static {
for(int i=0;i<nums.length;i++) {
nums[i] = r.nextInt(100);
}
System.out.println(Arrays.stream(nums).sum());
}
//无返回值
static class AddTask extends RecursiveAction{
int start, end;
private AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if(end -start <MAX_NUM) {
long sum =0L;
for(int i=start;i<end ;i++) sum += nums[i];
System.out.println("from "+start +" to "+end+" = "+sum);
}else {
int middle =start +(end-start)/2;
AddTask task1 = new AddTask(start, middle);
AddTask task2 = new AddTask(middle, end);
task1.fork();
task2.fork();
}
}
}
//有返回值
static class AddTask2 extends RecursiveTask<Long>{
int start, end;
private AddTask2(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if(end -start <MAX_NUM) {
long sum =0L;
for(int i=start;i<end ;i++) sum += nums[i];
return sum;
}
int middle =start +(end-start)/2;
AddTask2 task1 = new AddTask2(start, middle);
AddTask2 task2 = new AddTask2(middle, end);
task1.fork();//启动新线程
task2.fork();
return task1.join() + task2.join();
}
}
public static void main(String[] args) throws IOException {
ForkJoinPool fjp = new ForkJoinPool();
// AddTask task = new AddTask(0, nums.length);
// fjp.execute(task);
AddTask2 task2 = new AddTask2(0, nums.length);
fjp.execute(task2);
long result= task2.join();//阻塞的
System.out.println(result);
System.in.read();
}
}
ThreadPoolExecutor:
在上诉讲到的线程池中 :newFixedThreadPool ,newCachedThreadPool,newSingleThreadExecutor,newScheduledThreadPool 的底层实现中都是用到了 ThreadPoolExecutor 来创建线程池。而newWorkStealingPool 的底层用的是 ForkJoinPool ,ForkJoinPool是1.8以后才加入的。
简单看一下几种线程池构造函数的简单实现:
newFixedThreadPool :new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); 指定的线程数,最大线程数,多长时间空闲消失,时间单位,队列。 0L代表永远不会消失。
newCachedThreadPool:new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
newSingleThreadExecutor:new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
newScheduledThreadPool: super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
newWorkStealingPool:new ForkJoinPool (Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true)第一个是CPU核数