zoukankan      html  css  js  c++  java
  • java----Future

    Callable

      通过Runable和Thread, 无法获取子线程的运行结果。 Java5 引入了java.util.concurrent, 可以获取到子线程的运行结果。 

      Future接口可以理解成一个任务, Future.get()方法可以获取任务的运行结果

      虽然submit(task)是异步的,但是get()依旧是堵塞的,调用了get方法才执行call()方法。所以callable虽然为线程提供了返回值。但是依然是堵塞式调用

    public class TestThreadCallback {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            Callable task = new Task();
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            Future submit = executorService.submit(task);//异步
            Object o = submit.get();//堵塞
            System.out.println(o);
            executorService.shutdown();
        }
    
        //可以加泛型Callable<V>
        public static class Task implements Callable {
            @Override
            public Object call() throws Exception {
                return "xx";
            }
        }
    }
    

     

    Future

    场景

      上学的时候,老师需要收我们的作业进行批改(我们的作业忘记写了),我们叫一个空白的本子上去。然后老师开始批改作业。我在疯狂的补作业。等到老师批改到我的,我说我交错了,把我刚刚补的作业交给老师,此时就顺利度过难关。

      第一次交的空白的本子就是FutureData,第二次交上去的本子是RealData

    Future

      future的核心思想是异步调用。这是区别callable的地方。

      客户端调用服务端Future,future无法立刻返回数据。就返回一个空数据给客户端(FutureData)。当客户端需要查询空数据的时候,Future给客户端返回(RealData),如果服务端还没有准备好数据,就会报错(也即是我们上面场景中,我们的作业没有补全);

    ListenableFuture

      由于jdk1.5 Callable的问题,google开发了一款ListenableFuture

    • 配置(pom)
    引入 guava 依赖(com.google.guava)
    • 使用
    class ListenableFutureTest{
        public static void main(String[] args) {
            //线程池加上一个装饰器
            ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
            ListenableFuture<Object> future = service.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    System.out.println("1");
                    return "1";
                }
            });
    
            //future.addListener();//不方便使用
            Futures.addCallback(future, new FutureCallback<Object>() {
                @Override
                public void onSuccess(@Nullable Object ob) {
                    System.out.println("onSuccess:"+ob);
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    System.out.println("onFailure");
                }
            },service);
            service.shutdown();
        }
    }
    

      

    CompletableFuture

      jdk1.8之后,jdk模仿guava写了一个CompletableFuture

    • 使用1(推荐)
    • CompletableFuture+线程池 基本使用
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    
    class J8ComFuture3 {
        public static void main(String[] args) {
            ExecutorService executor = Executors.newFixedThreadPool(10);
            // get msg from tc
            System.out.println("Got reqeust from TC");
            // prepare RM
            System.out.println("prepare RM msg");
            // trigger cupd
            CompletableFuture<String> cupdResult = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    // TODO Auto-generated method stub
                    try {
                        System.out.println("sleep b4");
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("sleep after");
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    return "msg from CUPD";
                }
            }, executor);
            // return cupd
            cupdResult.thenAccept(new Consumer<String>() { //钩子函数,返回值
                // callback method
                public void accept(String arg0) {
                    System.out.println("return msg to TC=" + arg0);
                }
            });
            // return RM
            System.out.println("return RM msg to customer");
        }
    }
    
    • 使用2
    • 需求:主线程将三个任务交给三个线程做,如果一个线程调用失败,通知其他线程停止执行任务。合适cpu密集型操作
    • 方法:使用标志位
    /**
     * 只适合cpu密集型
     */
    class ComFutureTest{
        static  enum  Result{
            CANCELLED,SUCCESS,FAIL
        };
        static List<Mytask> tasks = new ArrayList<>();
        public static void main(String[] args) throws IOException {
            Mytask task1 = new Mytask("task1", 10);
            Mytask task2 = new Mytask("task2", 5);
            //表示task3执行一秒后报错,我们需要将task1和task2给关闭
            Mytask task3 = new Mytask("task3", 1);
    
    //        Mytask task1 = new Mytask("task1", 10, Result.SUCCESS);
    //        Mytask task2 = new Mytask("task2", 10, Result.SUCCESS);
    //        Mytask task3 = new Mytask("task3", 1, Result.FAIL);
            tasks.add(task1);
            tasks.add(task2);
            tasks.add(task3);
    
            //将三个任务交给 CompletableFuture 异步执行
            List<CompletableFuture> completableFutures = new ArrayList<>();
            for (Mytask mytask:tasks){
                CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() {
                    @Override
                    public Result get() {
                        return mytask.runTask();
                    }
                }).thenAccept(new Consumer() {
                    @Override
                    public void accept(Object ret) {
                        //如果某一个task失败,就通知其他task停止任务
                        if (Result.FAIL == ret) {
                            callback((Result) ret, mytask);
                        }
                        System.out.println(mytask.name + "结束任务,执行状态=" + ret);
                    }
                });
                completableFutures.add(completableFuture);
            }
            CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
            voidCompletableFuture.join();
    
            Boolean fail = false;
            List<Mytask> sucess = new ArrayList<>();
            //所有的任务执行完毕
            for (Mytask mytask: tasks){
                //如果有一个task执行失败
                if (mytask.ret == Result.SUCCESS){
                    sucess.add(mytask);
                }else if (mytask.ret == Result.FAIL){
                    fail = true;
                }
            }
            for (Mytask sucessTask:sucess){
                System.out.println(sucessTask.name+"回滚");
            }
            System.out.println("主线程结束");
        }
        //如果一个task失败了,通知其他的task调用各自的cancel,取消任务。
        public static void callback(Result ret,Mytask task){
            if (Result.FAIL == ret){
                for (Mytask _task:tasks){
                    if (_task!=task){
                        //cancel方法进行同步,所以callback不需要同步了
                        _task.cancel();
                    }
                }
            }
        }
        private static class Mytask{
            private String name;
            private int timeInSeconds;
            private Result ret;
            //当前自己任务是否正在取消
            public volatile Boolean cancelled;
            //当前自己任务是否已经取消
            public volatile Boolean cancelling;
    
            public Mytask(String name,int timeInSeconds){
                this.name = name;
                this.timeInSeconds = timeInSeconds;
                this.cancelled = false;
                this.cancelling = false;
            }
    
            public Result runTask(){
                int interval = 2;
                int total = 0;
                try{
                    for (;;){
                        //cpu密集型执行任务
                        Thread.sleep(interval*1000);
                        total += interval;
                        if (total>=timeInSeconds) break;
                        //cpu密集型可以用这种方式结束任务【因为不会堵塞,可以很快就能读取cancelled,判断需不需要继续执行任务】
                        if (cancelled){
                            ret = Result.CANCELLED;
                            return Result.CANCELLED;
                        }
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
                if ("task3".equals(name)){
                    System.out.println(name+"执行任务失败");
                    ret = Result.FAIL;
                    return ret;
                }else {
                    System.out.println(name+"执行任务成功");
                    ret = Result.SUCCESS;
                    return ret;
                }
            }
            public void cancel(){
                if (!cancelled){
                    synchronized (this){
                        if (cancelled) return;
                        //正在取消任务
                        cancelling = true;
                        //正在取消
                        System.out.println(name+"正在取消任务");
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println(name+"已经取消任务");
                        cancelled = true;
                    }
                }
            }
        }
    }
    • 使用3
    • 需求:主线程将三个任务交给三个线程做,如果一个线程调用失败,通知其他线程停止执行任务。合适IO密集型操作
    • 方法:使用interrupt
  • 相关阅读:
    Java基础知识点总结(四)
    Java基础知识点总结(三)
    用easyui实现查询条件的后端传递并自动刷新表格的两种方法
    在JDBC中实现SQL语句的模糊查询
    向DataGrid数据表格增加查询搜索框
    Java基础知识点总结(二)
    easyui中formatter的用法
    腾讯云+阿里云 搭建hadoop + hbase
    -- 记录 -- 问题记录
    -- 1 -- springboot
  • 原文地址:https://www.cnblogs.com/yanxiaoge/p/14371382.html
Copyright © 2011-2022 走看看