Callable
通过Runable和Thread, 无法获取子线程的运行结果。 Java5 引入了java.util.concurrent, 可以获取到子线程的运行结果。
Future接口可以理解成一个任务, Future.get()方法可以获取任务的运行结果
虽然submit(task)是异步的,但是get()依旧是堵塞的,调用了get方法才执行call()方法。所以callable虽然为线程提供了返回值。但是依然是堵塞式调用
public class TestThreadCallback { public static void main(String[] args) throws InterruptedException, ExecutionException { Callable task = new Task(); ExecutorService executorService = Executors.newFixedThreadPool(1); Future submit = executorService.submit(task);//异步 Object o = submit.get();//堵塞 System.out.println(o); executorService.shutdown(); } //可以加泛型Callable<V> public static class Task implements Callable { @Override public Object call() throws Exception { return "xx"; } } }
Future
场景
上学的时候,老师需要收我们的作业进行批改(我们的作业忘记写了),我们叫一个空白的本子上去。然后老师开始批改作业。我在疯狂的补作业。等到老师批改到我的,我说我交错了,把我刚刚补的作业交给老师,此时就顺利度过难关。
第一次交的空白的本子就是FutureData,第二次交上去的本子是RealData
Future
future的核心思想是异步调用。这是区别callable的地方。
客户端调用服务端Future,future无法立刻返回数据。就返回一个空数据给客户端(FutureData)。当客户端需要查询空数据的时候,Future给客户端返回(RealData),如果服务端还没有准备好数据,就会报错(也即是我们上面场景中,我们的作业没有补全);
ListenableFuture
由于jdk1.5 Callable的问题,google开发了一款ListenableFuture
- 配置(pom)
引入 guava 依赖(com.google.guava)
- 使用
class ListenableFutureTest{ public static void main(String[] args) { //线程池加上一个装饰器 ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); ListenableFuture<Object> future = service.submit(new Callable<Object>() { @Override public Object call() throws Exception { System.out.println("1"); return "1"; } }); //future.addListener();//不方便使用 Futures.addCallback(future, new FutureCallback<Object>() { @Override public void onSuccess(@Nullable Object ob) { System.out.println("onSuccess:"+ob); } @Override public void onFailure(Throwable throwable) { System.out.println("onFailure"); } },service); service.shutdown(); } }
CompletableFuture
jdk1.8之后,jdk模仿guava写了一个CompletableFuture
- 使用1(推荐)
- CompletableFuture+线程池 基本使用
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; class J8ComFuture3 { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10); // get msg from tc System.out.println("Got reqeust from TC"); // prepare RM System.out.println("prepare RM msg"); // trigger cupd CompletableFuture<String> cupdResult = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { // TODO Auto-generated method stub try { System.out.println("sleep b4"); TimeUnit.SECONDS.sleep(1); System.out.println("sleep after"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return "msg from CUPD"; } }, executor); // return cupd cupdResult.thenAccept(new Consumer<String>() { //钩子函数,返回值 // callback method public void accept(String arg0) { System.out.println("return msg to TC=" + arg0); } }); // return RM System.out.println("return RM msg to customer"); } }
- 使用2
- 需求:主线程将三个任务交给三个线程做,如果一个线程调用失败,通知其他线程停止执行任务。合适cpu密集型操作
- 方法:使用标志位
/** * 只适合cpu密集型 */ class ComFutureTest{ static enum Result{ CANCELLED,SUCCESS,FAIL }; static List<Mytask> tasks = new ArrayList<>(); public static void main(String[] args) throws IOException { Mytask task1 = new Mytask("task1", 10); Mytask task2 = new Mytask("task2", 5); //表示task3执行一秒后报错,我们需要将task1和task2给关闭 Mytask task3 = new Mytask("task3", 1); // Mytask task1 = new Mytask("task1", 10, Result.SUCCESS); // Mytask task2 = new Mytask("task2", 10, Result.SUCCESS); // Mytask task3 = new Mytask("task3", 1, Result.FAIL); tasks.add(task1); tasks.add(task2); tasks.add(task3); //将三个任务交给 CompletableFuture 异步执行 List<CompletableFuture> completableFutures = new ArrayList<>(); for (Mytask mytask:tasks){ CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() { @Override public Result get() { return mytask.runTask(); } }).thenAccept(new Consumer() { @Override public void accept(Object ret) { //如果某一个task失败,就通知其他task停止任务 if (Result.FAIL == ret) { callback((Result) ret, mytask); } System.out.println(mytask.name + "结束任务,执行状态=" + ret); } }); completableFutures.add(completableFuture); } CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])); voidCompletableFuture.join(); Boolean fail = false; List<Mytask> sucess = new ArrayList<>(); //所有的任务执行完毕 for (Mytask mytask: tasks){ //如果有一个task执行失败 if (mytask.ret == Result.SUCCESS){ sucess.add(mytask); }else if (mytask.ret == Result.FAIL){ fail = true; } } for (Mytask sucessTask:sucess){ System.out.println(sucessTask.name+"回滚"); } System.out.println("主线程结束"); } //如果一个task失败了,通知其他的task调用各自的cancel,取消任务。 public static void callback(Result ret,Mytask task){ if (Result.FAIL == ret){ for (Mytask _task:tasks){ if (_task!=task){ //cancel方法进行同步,所以callback不需要同步了 _task.cancel(); } } } } private static class Mytask{ private String name; private int timeInSeconds; private Result ret; //当前自己任务是否正在取消 public volatile Boolean cancelled; //当前自己任务是否已经取消 public volatile Boolean cancelling; public Mytask(String name,int timeInSeconds){ this.name = name; this.timeInSeconds = timeInSeconds; this.cancelled = false; this.cancelling = false; } public Result runTask(){ int interval = 2; int total = 0; try{ for (;;){ //cpu密集型执行任务 Thread.sleep(interval*1000); total += interval; if (total>=timeInSeconds) break; //cpu密集型可以用这种方式结束任务【因为不会堵塞,可以很快就能读取cancelled,判断需不需要继续执行任务】 if (cancelled){ ret = Result.CANCELLED; return Result.CANCELLED; } } }catch (Exception e){ e.printStackTrace(); } if ("task3".equals(name)){ System.out.println(name+"执行任务失败"); ret = Result.FAIL; return ret; }else { System.out.println(name+"执行任务成功"); ret = Result.SUCCESS; return ret; } } public void cancel(){ if (!cancelled){ synchronized (this){ if (cancelled) return; //正在取消任务 cancelling = true; //正在取消 System.out.println(name+"正在取消任务"); try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } System.out.println(name+"已经取消任务"); cancelled = true; } } } } }
- 使用3
- 需求:主线程将三个任务交给三个线程做,如果一个线程调用失败,通知其他线程停止执行任务。合适IO密集型操作
- 方法:使用interrupt