1、初始化线程的四种方式
继承Thread
实现Runnable接口
实现Callable接口 + FutrueTask(可以拿到结果处理异常)
线程池
方式1和方式2:主进程无法获取线程的运算结果
方式3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源,会导致服务器资源耗尽
方式4:可以通过如下两种方式初始化线程池
Executors.newFixedThreadPool(3)
new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit,unit,
workQueue,ThreadFactory,handler)
创建线程的四种方式的demo
public class ThreadTest { //保证当前系统中线程池只有一两个,每个异步任务直接提交给线程池 public static ExecutorService executorService = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { /* 1)继承Thread 2)实现Runnable接口 3)实现Callable接口 + FutrueTask(可以拿到结果处理异常) 4)线程池 我们以后在业务代码中,1-3 启动线程的方式都不用。将是所有的多线程异步任务都交给线程池来处理 区别: 1.2 不能得到返回值 1.2.3 都不能控制资源 (每次都需要new一个线程) 4可以控制资源,使系统性能稳定 (可以重复利用线程池中的线程,当一千万个异步任务进来,会等待线程池中的线程空闲,才会执行新的 异步任务) */ System.out.println("main start"); /*Thread01 thread01 = new Thread01(); thread01.start(); Thread thread = new Thread(new Runable01()); thread.start(); FutureTask<Integer> futureTask = new FutureTask<>(new Callable01()); new Thread(futureTask).start(); //main线程等待callable线程执行完成,获取返回结果 Integer res = futureTask.get(); System.out.println("获取到线程返回的结果:"+ res); */ //使用线程池 executorService.execute(new Runable01()); System.out.println("main end"); } public static class Thread01 extends Thread{ @Override public void run() { System.out.println("当前线程:"+ Thread.currentThread().getId()); int i = 10/2; System.out.println("运行结果:"+ i); } } public static class Runable01 implements Runnable{ @Override public void run() { System.out.println("当前线程:"+ Thread.currentThread().getId()); int i = 10/2; System.out.println("运行结果:"+ i); } } public static class Callable01 implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("当前线程:"+ Thread.currentThread().getId()); int i = 10/2; System.out.println("运行结果:"+ i); return i; } } }
2、线程池的使用
1)线程池的7大参数
corePoolSize:【5】核心线程数;线程池创建好以后就准备就绪的线程,等待接收异步任务去执行
相当于new了5个线程 Thread thread = new Thread()
核心线程数是一直存在的线程,除非设置了 allowCoreThreadTimeOut,核心线程超时被回收
maximumPoolSize: 最大线程数,用来控制资源并发
keepAliveTime:存活时间,如果当前线程数量大于核心数量,将会释放多余空闲的线程
( 线程空闲时间大于keepAliveTime,且数量为maximumPoolSize - corePoolSize)
unit:时间单位
BlockingQueue: 如果任务有很多,就会将目前多的任务(任务数 - maximumPoolSize)放在队列中
只要有线程空闲,就会去队列中取出新的任务继续执行
ThreadFactory: 线程的创建工厂
RejectedExecutionHandler:如果队列满了,按照我们指定的拒绝策略拒绝执行任务
2)线程池的工作顺序
1)线程池创建,准备好core数量的核心线程,准备接收任务 2)新的任务进来,用core准备好的空闲线程执行 ①、core满了,就将再进来的任务放到阻塞队列中。空闲的core会自己从阻塞队列中获取任务执行 ②、阻塞队列满了,就直接开新线程执行,最大只能开到max指定的数量 ③、max都执行好了。max-core数量的空闲线程会在keepAliveTime指定的时间后,自动销毁 最终保持到core的大小 ④、如果线程数到了max的数量,还有新任务进来,就会使用reject指定的拒绝策略进行处理 3)所有线程的创建都是由指定的factory创建的 面试题: 一个线程池 core:7 max:20 queue: 50 , 100并发进来是怎么进行分配的? 7个会立即执行 50个会进入队列 会开辟13个线程进行执行 剩下的30使用拒绝策略来执行
3)几种线程池
new ThreadPoolExecutor(5,200,10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); // core = 0, 所有的线程都可以被回收 Executors.newCachedThreadPool(); // core = max 且一直存在,不可被回收 Executors.newFixedThreadPool(10); // core = 1 单线程线程池,且一直存在,不可被回收 Executors.newSingleThreadExecutor();
4)开发中为什么要使用线程池
①、降低资源的损耗
通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
②、提高响应速度
因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态
当任务来时无需创建新的线程即可执行
3、CompletableFuture 异步编排
1)创建异步对象
runXxxx 都是没有返回结果的
supplyXxx 都是可以获取返回结果的可以传入自定义的线程池,否则就用默认的线程池
2)计算完成时的回调方法
whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池的一个线程来进行执行。
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
System.out.println("main start.."); CompletableFuture<Integer> futrue = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("运行结果:" + i); return i; }, executor).whenComplete((res,exception)->{ //whenComplete 虽然能得到异常信息,但是无法修改返回结果 System.out.println("异步任务执行完成,结果是:"+ res + ";异常是:"+exception); }).exceptionally(t -> { //exceptionally 可以感知异常,同时返回默认值 return 10; // }); Integer res = futrue.get(); System.out.println("main end.."+res);
3)handle方法
和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。
System.out.println("main start.."); CompletableFuture<Integer> futrue = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("运行结果:" + i); return i; }, executor).handle((res,exception) -> { if(res!=null){ return res * 2; } if(exception!=null){ return 0; } return 0; }); Integer res = futrue.get(); System.out.println("main end.."+res);
4)线程串行化方法
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,
区别:
thenApply都会接受第一个任务的结果,并且会返回第二个任务的结果
thenAccept 都会接受第一个任务的结果,但是不会返回第二个任务的结果
thenRun 只要第一个任务执行完成,就开始执行 thenRun,并且不会返回第二个任务的结果
5)两任务组合 (都要完成)
thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务
System.out.println("main start.."); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1进程:" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("任务1结束:" + i); return i; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2进程:" + Thread.currentThread().getId()); System.out.println("任务2结束"); return "hello"; }, executor); /*future1.runAfterBothAsync(future2, ()->{ System.out.println("任务3开始"); },executor); */ //f1,f2是future1、future2的返回结果 /* future1.thenAcceptBothAsync(future2, (f1,f2)-> { System.out.println("任务3开始: future1 + future2 = "+ f1+f2); },executor);*/ CompletableFuture<String> future3 = future1.thenCombineAsync(future2, (f1, f2) -> { System.out.println("任务3开始: future1 + future2 = "); return f1 + f2; }, executor); System.out.println("main end.."+future3.get());
6)两任务组合 (一个完成)
当两个任务中,任意一个 future 任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
System.out.println("main start.."); CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1进程:" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("任务1结束:" + i); return i; }, executor); CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2进程:" + Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务2结束"); return "hello"; }, executor); CompletableFuture<String> future = future1.applyToEitherAsync(future2, (res) -> { System.out.println("任务1,任务2只有一个完成"); System.out.println("任务1|任务2 结果:" + res); return res + "hello"; }, executor); System.out.println("main end:" + future.get());
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
System.out.println("main start.."); CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1进程:" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("任务1结束:" + i); return i; }, executor); CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2进程:" + Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务2结束"); return "hello"; }, executor); future1.acceptEitherAsync(future2, (res)->{ System.out.println("任务1,任务2只有一个完成"); System.out.println("任务1|任务2 结果:"+res); },executor);
runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值
System.out.println("main start.."); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1进程:" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("任务1结束:" + i); return i; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2进程:" + Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务2结束"); return "hello"; }, executor); //两任务组合(一个完成) future1.runAfterEitherAsync(future2, ()->{ System.out.println("任务1,任务2只有一个完成"); },executor);
7)多任务组合
allOf: 等待所有任务完成
anyOf:只要有一个任务完成