1.Future、Callable、FutureTask
线程池提供的submit方法
<T> Future<T> submit(Callable<T> task); //Callable call方法具有返回值,Future对象可以通过调用其get()方法来获 取任务的执行结果。
<T> Future<T> submit(Runnable task, T result); //需要你注意的是Runnable接口的实现类Task声明了一个有 参构造函数 Task(Result r) ,创建Task对象的时候传入了result对象,这样就能在类Task的run()方法 中对result进行各种操作了。result相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
Future<?> submit(Runnable task); //Runnable run方法没有返回值 返回的Future仅用于判断执行是否完成
1.1Callable
在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable接口,就可以让线程池去执行。但是使用Runable接口无法获取任务执行的结果。
于是Java提供了Callable接口,相比于Runable增加了返回值,并且Callable接口是一个泛型接口,可以返回指定类型的结果。
public interface Callable<V> {
V call() throws Exception;
}
将Callable 最为线程池执行任务的参数 通过返回值Future就可以获得call方法的实现中返回的结果值:
public static void main(String[] args) throws Exception{
//初始化线程池,实际开发中不建议这么做
ExecutorService threadPool = Executors.newFixedThreadPool(2);
Future reuslt = threadPool.submit(new Callable<Integer>() {
//匿名内部类实现Callable接口
@Override
public Integer call() throws Exception {
System.out.println("callable task run");
return 10;
}
});
//或者执行结果 会阻塞
Integer num = (Integer) reuslt.get();
System.out.println(num);
threadPool.shutdown();
}
//执行结果
//callable task run
//10
1.2Future
Future作为线程池的执行的返回值,提供了以下的方法
boolean cancel(boolean mayInterruptIfRunning); //取消任务的方法
boolean isCancelled(); //判断任务是否已取消
boolean isDone(); //判断任务是否已结束
V get() throws InterruptedException, ExecutionException; //任务执行结果 当前线程阻塞
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; //任务执行结果 超时机制 当前线程阻塞
1.3FutureTask
FutureTask实现了Runnable和Future接口,由于实现了Runnable 接口,所以可以将FutureTask对象作为任务提交给ThreadPoolExecutor去执行,也可以直接被Thread执行;
又因为实现了Future接口,所以也能用来获得任务的执行结果。
//接口关系
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();//接口的多继承
}
//构造方法
FutureTask(Callable callable);
FutureTask(Runnable runnable, V result);
//线程池执行FutureTask 并获取结果
public static void main(String[] args) throws Exception{
// 创建
FutureTask futureTask = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 提交
Future future = es.submit(futureTask);
// 获取计算结果
Integer result = (Integer) futureTask.get();
System.out.println(result);
es.shutdown();
}
//直接由线程执行并且获取结果
public static void main(String[] args) throws Exception{
// 创建
FutureTask futureTask = new FutureTask<>(()-> 1+2);
Thread th1 = new Thread(futureTask);
th1.start();
Integer result =(Integer) futureTask.get();
System.out.println(result);
}
2.CountDownLatch
CountDownLatch是Java 1.5提供的线程控制工具类,主要用来解决一个线程等待 多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点.
//构造方法 指定计数器
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//计数器 -1
countDownLatch.countDown();//完成后计数器-1
///等待计数器为0
countDownLatch.await();
countDown()方法一般用于在线程池的任务中,注意进行异常捕获,不然在await()时可能会造成一直阻塞。
ExecutorService executorService =Executors.newFixedThreadPool(3);
CountDownLatch cdl = new CountDownLatch(3);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
function1();
} catch (Exception e) {
//异常处理
e.printStackTrace();
}
finally {
cdl.countDown();
}
}
});
3.CyclicBarrier
CyclicBarrier 也是Java 1.5中提供的线程同步控制类,CyclicBarrier字面意思是“可重复使用的栅栏”,使用CyclicBarrier可以实现一组线程相互的等待。
在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理。
//构造器 指定在所有线程都完成,即计数器为0时执行的操作 barrierAction
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//构造器
public CyclicBarrier(int parties) {
this(parties, null);
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
- 线程调用 await() 表示自己已经到达栅栏,当前线程阻塞,等待其他线程到达栅栏
- BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
- InterruptedException,其他线程中断了这个线程就会抛出该异常,其他的线程就会抛出BrokenBarrierException
public static void main(String[] args) throws Exception{
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()-> System.out.println("所有运动员准备就绪"));
ExecutorService executors = Executors.newFixedThreadPool(10);
for(int i=0;i<10;i++){
executors.submit(()->{
try {
Thread.sleep(100);
System.out.println("我准备好了");
cyclicBarrier.await();//到达栅栏 阻塞 等待其他线程到达
} catch (InterruptedException e) {
e.printStackTrace();
}catch (BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("起飞!");
});
}
executors.shutdown();
}
demo代码中栅栏的计数器为5,循环线程数为10,当5个一组进行完之后,计数器会重置开启下一轮。
4.CompletableFuture
异步化是利用多线程进行程序性能优化的基础
4.1 Java中实现异步的方法:
- 1.异步调用 调用方创建一个子线程,在子线程中执行方法调用
- 2.异步方法 方法实现时,创建新线程执行主要的逻辑,主线程直接返回结果
异步转同步的方法:主线程阻塞直到子线程执行完成。
Java在1.8版本提供了CompletableFuture来支持异步编程,实现非常复杂,但是功能也很强大。
CompletableFuture 优点
- 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
- 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务3要等待任务1和任 务2都完成后才能开始”;
- 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
/**
* 实现Future 接口 可以获得执行结果
* CompletionStage 异步流程中的串行,汇聚
*/
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
//判断线程池的并行级别是否大于1 并行级别决定了同一时刻最多有多少个线程在执行,如不传如并行级别参数,将默认为当前系统的CPU个数。
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
//默认的线程池声明
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
//创建CompletableFuture的静态方法
//使用默认的线程池
static > CompletableFuture runAsync(Runnable runnable)
static CompletableFuture supplyAsync(Supplier supplier)
//可以指定线程池
static CompletableFuture runAsync(Runnable runnable, Executor executor)
static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
//Runnable 接口的run()方法没有返回值,
//而Supplier接口的get()方法是有 返回值的 是函数式编程的接口
//创建完CompletableFuture对象之后,会自动地异步执行runnable.run()方法或者supplier.get()方法
//如果所有的CompletableFuture 都使用同一个线程池,当进行IO这种耗时的操作时,就会造成长时间的等待,造成线程饥饿,
根据不同的业务类型创建不同的线程池,以避免互相干扰。
}
4.2 异步流程控制相关方法
4.2.1 串行
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
//thenApply既能接收参数也支持返回值,所以thenApply系列方法返回的是 ? extends U ,U为CompletableFuture
//thenAccept、 虽然支持参数,但却不支持回值
//thenRun、 参数是Runnable,所以action既不能接收参数也不支持返回值
//thenCompose 个系列的方法会新创建出一个子流程,最终结果和thenApply系列是相同的。
CompletionStage thenApply(fn);
CompletionStage thenApplyAsync(fn);
CompletionStage thenAccept(consumer);
CompletionStage thenAcceptAsync(consumer);
CompletionStage thenRun(action);
CompletionStage thenRunAsync(action);
CompletionStage thenCompose(fn);
CompletionStage thenComposeAsync(fn);
//Async代表的是异步执行fn、consumer或者action
//首先通过supplyAsync()启动一个异步 流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是 串行执行的,②依赖①的执行结果,③依赖②的执行结果。
CompletableFuture f0 = CompletableFuture.supplyAsync( () -> "Hello World") //1
.thenApply(s -> s + " QQ") //2
.thenApply(String::toUpperCase);//3
System.out.println(f0.join());
//输出结果 HELLO WORLD QQ
4.2.2.汇聚
AND 是thenCombine、thenAcceptBoth和runAfterBoth 系列方法,参数分别是 接口的区别也是源自fn、consumer、action这三个核心参数不同
CompletionStage thenCombine(other, fn);
CompletionStage thenCombineAsync(other, fn);
CompletionStage thenAcceptBoth(other, consumer);
CompletionStage thenAcceptBothAsync(other, consumer);
CompletionStage runAfterBoth(other, action);
CompletionStage runAfterBothAsync(other, action);
OR 主要是applyToEither、acceptEither和runAfterEither系列的 接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
//两者任一完成即可
CompletableFuture f1 = CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture f2 = CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture f3 = f1.applyToEither(f2,s -> s);
System.out.println(f3.join());
4.2.3、异常处理
fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它 们抛出运行时异常。
CompletionStage exceptionally(fn);
CompletionStage whenComplete(consumer);
CompletionStage whenCompleteAsync(consumer);
CompletionStage handle(fn);
CompletionStage handleAsync(fn);
exceptionally()的使用非常类似于 try{}catch{}中的catch{},
但是由于支持链式编程方式,所以相对更简单。既然有try{}catch{},那就一定还 有try{}finally{},
whenComplete()和handle()系列方法就类似于try{}finally{}中的finally{},
无论是否发生异 常都会执行whenComplete()中的回调函数consumer和handle()中的回调函数fn
whenComplete()和 handle()的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。
exceptionally捕获异常
CompletableFuture f0 = CompletableFuture .supplyAsync(()->7/0))
.thenApply(r->r*10)
.exceptionally(e->0);
System.out.println(f0.join());
除此之外还有CompletionService提供批量的异步操作,ForkJoinPool、ForkJoinTask提供分治任务。
PS:函数式编程
函数式接口(Functional Interface)
就是一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。
函数式接口可以被隐式转换为 lambda 表达式。
Lambda 表达式和方法引用(实际上也可认为是Lambda表达式)上。
如定义了一个函数式接口如下:
@FunctionalInterface GreetingService
{
void sayMessage(String message);}
那么就可以使用Lambda表达式来表示该接口的一个实现(注:JAVA 8 之前一般是用匿名类实现的):
GreetingService greetService1 = message -> System.out.println("Hello " + message);
Function<T,R> 接口
FunctionalInterface
public interface Function<T, R> {
// 输入参数T,经过该函数处理得到R。也就是将参数应用于这个函数。
R apply(T t);
// 此函数有两个参数先将传进来的这个参数传入到before这个函数里面进行处理一下,然后before这个函数的返回结果再作为一个参数传递给外面的这个函数。
default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
Objects.requireNonNull(before);
return (V v) -> apply(before.apply(v));
}
// 此函数有两个参数先将传进来的这个参数传入到apply这个函数里面进行处理一下,然后这个函数的返回结果再作为一个参数传递给外面after的这个函数。
default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (T t) -> after.apply(apply(t));
}
static <T> Function<T, T> identity() {
return t -> t;
}
}
Function 就是一个函数,T代表输入参数,R代表返回的结果。其作用类似于数学中函数的定义y=f(x),(x,y)跟<T,R>的作用几乎一致。
所以Function中没有具体的操作,具体的操作需要我们去为它指定,因此apply具体返回的结果取决于传入的lambda表达式。
public void test(){
Function<Integer,Integer> test=i->i+1;
test.apply(5);
}
/** print:6*/
Java.util.function 包中的类
https://www.runoob.com/java/java8-functional-interfaces.html