Callable
通过Runable和Thread, 无法获取子线程的运行结果。 Java5 引入了java.util.concurrent, 可以获取到子线程的运行结果。
Future接口可以理解成一个任务, Future.get()方法可以获取任务的运行结果
虽然submit(task)是异步的,但是get()依旧是堵塞的,调用了get方法才执行call()方法。所以callable虽然为线程提供了返回值。但是依然是堵塞式调用
public class TestThreadCallback {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Callable task = new Task();
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future submit = executorService.submit(task);//异步
Object o = submit.get();//堵塞
System.out.println(o);
executorService.shutdown();
}
//可以加泛型Callable<V>
public static class Task implements Callable {
@Override
public Object call() throws Exception {
return "xx";
}
}
}
Future
场景
上学的时候,老师需要收我们的作业进行批改(我们的作业忘记写了),我们叫一个空白的本子上去。然后老师开始批改作业。我在疯狂的补作业。等到老师批改到我的,我说我交错了,把我刚刚补的作业交给老师,此时就顺利度过难关。
第一次交的空白的本子就是FutureData,第二次交上去的本子是RealData
Future
future的核心思想是异步调用。这是区别callable的地方。
客户端调用服务端Future,future无法立刻返回数据。就返回一个空数据给客户端(FutureData)。当客户端需要查询空数据的时候,Future给客户端返回(RealData),如果服务端还没有准备好数据,就会报错(也即是我们上面场景中,我们的作业没有补全);
ListenableFuture
由于jdk1.5 Callable的问题,google开发了一款ListenableFuture
- 配置(pom)
引入 guava 依赖(com.google.guava)
- 使用
class ListenableFutureTest{
public static void main(String[] args) {
//线程池加上一个装饰器
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
ListenableFuture<Object> future = service.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("1");
return "1";
}
});
//future.addListener();//不方便使用
Futures.addCallback(future, new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object ob) {
System.out.println("onSuccess:"+ob);
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("onFailure");
}
},service);
service.shutdown();
}
}
CompletableFuture
jdk1.8之后,jdk模仿guava写了一个CompletableFuture
- 使用1(推荐)
- CompletableFuture+线程池 基本使用
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
class J8ComFuture3 {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
// get msg from tc
System.out.println("Got reqeust from TC");
// prepare RM
System.out.println("prepare RM msg");
// trigger cupd
CompletableFuture<String> cupdResult = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// TODO Auto-generated method stub
try {
System.out.println("sleep b4");
TimeUnit.SECONDS.sleep(1);
System.out.println("sleep after");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "msg from CUPD";
}
}, executor);
// return cupd
cupdResult.thenAccept(new Consumer<String>() { //钩子函数,返回值
// callback method
public void accept(String arg0) {
System.out.println("return msg to TC=" + arg0);
}
});
// return RM
System.out.println("return RM msg to customer");
}
}
- 使用2
- 需求:主线程将三个任务交给三个线程做,如果一个线程调用失败,通知其他线程停止执行任务。合适cpu密集型操作
- 方法:使用标志位
/**
* 只适合cpu密集型
*/
class ComFutureTest{
static enum Result{
CANCELLED,SUCCESS,FAIL
};
static List<Mytask> tasks = new ArrayList<>();
public static void main(String[] args) throws IOException {
Mytask task1 = new Mytask("task1", 10);
Mytask task2 = new Mytask("task2", 5);
//表示task3执行一秒后报错,我们需要将task1和task2给关闭
Mytask task3 = new Mytask("task3", 1);
// Mytask task1 = new Mytask("task1", 10, Result.SUCCESS);
// Mytask task2 = new Mytask("task2", 10, Result.SUCCESS);
// Mytask task3 = new Mytask("task3", 1, Result.FAIL);
tasks.add(task1);
tasks.add(task2);
tasks.add(task3);
//将三个任务交给 CompletableFuture 异步执行
List<CompletableFuture> completableFutures = new ArrayList<>();
for (Mytask mytask:tasks){
CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Result get() {
return mytask.runTask();
}
}).thenAccept(new Consumer() {
@Override
public void accept(Object ret) {
//如果某一个task失败,就通知其他task停止任务
if (Result.FAIL == ret) {
callback((Result) ret, mytask);
}
System.out.println(mytask.name + "结束任务,执行状态=" + ret);
}
});
completableFutures.add(completableFuture);
}
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
voidCompletableFuture.join();
Boolean fail = false;
List<Mytask> sucess = new ArrayList<>();
//所有的任务执行完毕
for (Mytask mytask: tasks){
//如果有一个task执行失败
if (mytask.ret == Result.SUCCESS){
sucess.add(mytask);
}else if (mytask.ret == Result.FAIL){
fail = true;
}
}
for (Mytask sucessTask:sucess){
System.out.println(sucessTask.name+"回滚");
}
System.out.println("主线程结束");
}
//如果一个task失败了,通知其他的task调用各自的cancel,取消任务。
public static void callback(Result ret,Mytask task){
if (Result.FAIL == ret){
for (Mytask _task:tasks){
if (_task!=task){
//cancel方法进行同步,所以callback不需要同步了
_task.cancel();
}
}
}
}
private static class Mytask{
private String name;
private int timeInSeconds;
private Result ret;
//当前自己任务是否正在取消
public volatile Boolean cancelled;
//当前自己任务是否已经取消
public volatile Boolean cancelling;
public Mytask(String name,int timeInSeconds){
this.name = name;
this.timeInSeconds = timeInSeconds;
this.cancelled = false;
this.cancelling = false;
}
public Result runTask(){
int interval = 2;
int total = 0;
try{
for (;;){
//cpu密集型执行任务
Thread.sleep(interval*1000);
total += interval;
if (total>=timeInSeconds) break;
//cpu密集型可以用这种方式结束任务【因为不会堵塞,可以很快就能读取cancelled,判断需不需要继续执行任务】
if (cancelled){
ret = Result.CANCELLED;
return Result.CANCELLED;
}
}
}catch (Exception e){
e.printStackTrace();
}
if ("task3".equals(name)){
System.out.println(name+"执行任务失败");
ret = Result.FAIL;
return ret;
}else {
System.out.println(name+"执行任务成功");
ret = Result.SUCCESS;
return ret;
}
}
public void cancel(){
if (!cancelled){
synchronized (this){
if (cancelled) return;
//正在取消任务
cancelling = true;
//正在取消
System.out.println(name+"正在取消任务");
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(name+"已经取消任务");
cancelled = true;
}
}
}
}
}
- 使用3
- 需求:主线程将三个任务交给三个线程做,如果一个线程调用失败,通知其他线程停止执行任务。合适IO密集型操作
- 方法:使用interrupt