zoukankan      html  css  js  c++  java
  • 廖雪峰Java11多线程编程-3高级concurrent包-8CompletableFuture

    1. 使用Future不能自动获得异步执行结果

        Future<String> future = executor.submit(task);
        String result = future.get();
        while(!future.isDone()){
            Thread.sleep(1);
        }
        String result = future.get()
    

    使用Future获得异步执行结果:

    • 但是当我们使用get()获得异步执行结果的时候,这个方法可能会阻塞。
    • 还可以通过while循环反复调用isDone()来判断异步结果是否已经完成。

    所以使用Future获得异步执行的结果有2个方法:

    • 1.调用阻塞方法get()
    • 2.轮询isDone()

    这2种方法都不是很好,我们期望在异步任务完成的时候自动返回结果,所以JDK提供了CompletableFuture接口。

    2. CompletableFutrue接口

    当我们使用CompletableFuture接口时:

    • 任务执行结束的时候,它会自动调用我们设置好的回调函数thenAccept;
    • 当任务发生异常的时候,它也可以自动调用我们设置好的另一个回调函数exceptionally。

    例如当异步结果正常执行完毕的时候,我们用thenAccept传入一个回调对象,就可以获得正常运行的异步结果;我们用exceptionally传入一个回调对象,可以获得运行时发生异常的情况。

        CompletableFuture<String> cf = getCompletableFutureFromSomewhere();
        //thenAccept传入一个回调对象,就可以获得正常运行的异步结果
        cf.thenAccept(new Consumer<String>() { 
            public void accept(String result) {
                System.out.println("正常运行获得异步结果:"+ result);
            }
        });
        //用exceptionally传入一个回调对象,可以获得运行时发生异常的情况
        cf.exceptionally(new Function<Throwable,  String>() { 
            public String apply(Throwable t){
                System.out.println("运行发生异常:" + t.getMessage());
                return null;    
            }   
        });
    

    使用Java8新增的函数式语法可以进一步简化代码

        CompletableFuture<String> cf = getCompletableFutureFromSomewhere();
        cf.thenAccept( (result) -> {
            System.out.println("正常运行获得异步结果"+result);
        });
        cf.exceptionally( (t) -> {
            System.out.println("运行时发生异常:" + t.getMessage());
            return null;
        });
    

    CompletableFuture的优点:

    • 异步任务结束时,会自动回调某个对象的方法
    • 异步任务出错时,会自动回调某个对象的方法
    • 主线程设置好回调后,不再关心异步任务的执行

    CompletableFuture的基本用法:

        CompletableFuture<String> cf = CompletableFuture.supplyAsync("异步执行实例");
        cf.thenAccept("获得结果后的操作");
        cf.exceptionally("发生异常时的操作");
    

    3.示例

    工具类:根据传入的地址下载结果
    如测试地址:http://hq.sinajs.cn/list=sh000001,下载的内容应该是,如果存在乱码,应该是编码问题。

    var hq_str_sh000001="上证指数,2992.4919,2987.9287,2991.3288,3009.4575,2980.4774,0,0,350523658,395955641514,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2020-02-27,15:02:03,00,";
    

    DownloadUtil.java

    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.URL;
    import java.net.URLConnection;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class DownloadUtil implements Callable<String> {
        String url;
        public DownloadUtil(String url){
            this.url = url;
        }
        public String call() throws Exception {
            URLConnection conn = new URL(this.url).openConnection();
            conn.connect();
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()))){
    
                String s = null;
                StringBuilder sb = new StringBuilder();
                while ((s = reader.readLine()) != null) {
                    sb.append(s).append("
    ");
                }
                return sb.toString();
            }
        }
    
        static String download(String url) throws Exception{
            ExecutorService executor = Executors.newCachedThreadPool();
            DownloadUtil task= new DownloadUtil(url);
            Future<String> future = executor.submit(task);
            String html = future.get();
            executor.shutdown();
            return html;
        }
    }
    

    3.1 单个CompletableFuture

    编写一个StockSupplier继承至Supplier接口,返回一个float类型。在get()方法中,定义了这个异步任务具体的操作,传入一个URL,获得一个股票的价格,然后返回这个价格。
    在main方法中,首先通过CompletableFuture.supplyAsync传入任务,就创建一个CompletableFuture的实例;
    然后对这个实例调用thenAccept在异步结果正常的情况下,打印出股票的价格,用exceptionally在异步任务出错的时候,打印一个error。
    最后,注意主线程不要立即结束。因为CompletableFuture默认使用的是Executor,它会在主线程结束的时候关闭。所以我们用join等待异步线程结束后,再关闭主线程。
    CompletableFutureSample.java

    import java.util.concurrent.CompletableFuture;
    import java.util.function.Consumer;
    import java.util.function.Function;
    import java.util.function.Supplier;
    
    class StockSupplier implements Supplier<Float>{
        public Float get(){
            String url = "http://hq.sinajs.cn/list=sh000001";
            System.out.println("GET: "+url);
            try{
                //以' , '分割,取第4个值
                String result = DownloadUtil.download(url);
                String[] ss = result.split(",");
                return Float.parseFloat(ss[3]);
            }catch (Exception e){
                throw new RuntimeException();
            }
        }
    }
    public class CompletableFutureSample {
        public static void main(String[] args) throws Exception{
            CompletableFuture<Float> getStockFuture = CompletableFuture.supplyAsync(new StockSupplier());
            getStockFuture.thenAccept(new Consumer<Float>() {
                @Override
                public void accept(Float price) {
                    System.out.println("Current price:" + price);
                }
            });
            getStockFuture.exceptionally(new Function<Throwable, Float>() {
                @Override
                public Float apply(Throwable t) {
                    System.out.println("Error: " + t.getMessage());
                    return Float.NaN;
                }
            });
            getStockFuture.join();
        }
    }
    

    把URL修改下,替换为http://hq.sinajssss.cn/list=sh000001,会走异常处理逻辑

    3.2多个CompletableFuture可以串行执行

    当我们的第一个CompletableFuture cf1执行结束的时候,我们可以用thenApplyAsync然后开始执行第2个CompletableFuture。如果cf2执行完毕的时候,我们可以用thenApplyAsync来执行第3个CompletableFuture,我们对最后一个CompletableFuture cf3调用thenAccept来获取最终的执行结果。

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync("异步执行实例1");
        CompletableFuture<LocalDate> cf2 = cf1.thenAcceptAsync("异步执行实例2");
        CompletableFuture<Float> cf3 = cf2.thenApplyAsync("异步执行实例3");
        cf3.thenAccept("实例3获得结果后的操作");
    

    例如我们可以先通过一个异步任务查询证券代码,然后我们根据查询到的证券代码,再启动一个异步任务来查询证券的价格。最后我们显示证券的价格。这样我们就可以把两个CompletableFuture串形执行。

    import java.awt.geom.FlatteningPathIterator;
    import java.util.concurrent.CompletableFuture;
    import java.util.function.Consumer;
    import java.util.function.Function;
    import java.util.function.Supplier;
    
    class Price{
        final String code;
        final float price;
        public Price(String code,float price){
            this.code = code;
            this.price = price;
        }
    }
    /*
    * 首先定义一个StockLookupSupplier,它的作用是通过证券名称查询证券代码。
    * 我们传入一个证券的名称,然后返回它的证券代码。
    */
    class StockLookupSupplier implements Supplier<String>{
        String name;
        public StockLookupSupplier(String name){
            this.name = name;
        }
        public String get(){
            System.out.println("lookup:"+name);
            try{
                String url = "http://suggest3.sinajs.cn/suggest/type=11,12&key=0&name="+name;
                String result = DownloadUtil.download(url);
                String[] ss = result.split(",");
                return ss[3];
            }catch (Exception e){
                throw new RuntimeException(e);
            }
        }
    }
    public class CompletableFutureSequenceSample {
        public static void main(String[] args) {
            String name = "上证指数";
            //在main方法中,我们先创建一个CompletableFuture,传入上证指数这个名称,获得证券代码
            CompletableFuture<String> getStockCodeFuture = CompletableFuture.supplyAsync(new StockLookupSupplier(name));
            /*        
            * 我们对getStockCodeFuture调用了theAcceptAsync,然后又传入了一个新的对象new Function<String, Price>,这个对象的作用就是当我们获得证券代码后,我们再获取证券的价格
            * 所以我们可以看到,thenAcceptAsync它实际上是把上一个CompletableFuture在运行结束以后自动转化成下一个CompletableFuture。
            */
            CompletableFuture<Price> getStockPriceFuture = getStockCodeFuture.thenApplyAsync(new Function<String, Price>() {
                @Override
                public Price apply(String code) {
                    System.out.println("get Code:"+code);
                    try{
                        String url = "http://hq.sinajs.cn/list="+code;
                        String result = DownloadUtil.download(url);
                        String[] ss = result.split(",");
                        return new Price(code, Float.parseFloat(ss[3]));
                    }catch (Exception e){
                        throw new RuntimeException(e);
                    }
                }
            });
            // 最后我们对第2个CompletableFuture调用了一个thenAccept来打印最终的结果
            getStockPriceFuture.thenAccept(new Consumer<Price>() {
                @Override
                public void accept(Price price) {
                    System.out.println(price.code+":"+price.price);
                }
            });
            getStockPriceFuture.join();
        }
    }
    

    3.3 多个CompletableFuture可以并行执行

    例如我们可以从新浪查询证券价格,也可以从网易查询证券价格。这2个异步操作可以并行执行,并且我们设置anyOf,任意一个异步任务首先返回的时候,我们就可以直接获得最终的结果。

    import java.util.concurrent.CompletableFuture;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    
    class StockPrice{
        final String from;
        final float price;
        StockPrice(float price,String from){
            this.from = from;
            this.price = price;
        }
        public String toString(){
            return "Price:"+price+" from "+from;
        }
    }
    
    class StockFromSina implements Supplier<StockPrice>{
        public StockPrice get(){
            String url = "http://hq.sinajs.cn/list=sh000001";
            System.out.println("GET:"+url);
            try{
                String result = DownloadUtil.download(url);
                String[] ss = result.split(",");
                return new StockPrice(Float.parseFloat(ss[3]),"sina");
            }catch (Exception e){
                throw new RuntimeException(e);
            }
        }
    }
    class StockFromNetease implements Supplier<StockPrice>{
        public StockPrice get(){
            String url = "http://api.money.126.net/data/feed/0000001,money.api?callback=_ntes_quote_callback";
            System.out.println("GET:"+url);
            try{
                String result = DownloadUtil.download(url);
                int priceIndex = result.indexOf(""price"");
                int start = result.indexOf(":",priceIndex) + 1;
                int end = result.indexOf(",",priceIndex);
                System.out.println(Float.parseFloat(result.substring(start,end)));
                return new StockPrice(Float.parseFloat(result.substring(start,end)),"netease");
            }catch (Exception e){
                throw new RuntimeException(e);
            }
        }
    }
    
    public class CompletableFutureAnyOfSample {
        public static void main(String[] args) throws Exception{
            CompletableFuture<StockPrice> getSockFromSina = CompletableFuture.supplyAsync(new StockFromSina());
            CompletableFuture<StockPrice> getStockFromNetease = CompletableFuture.supplyAsync(new StockFromNetease());
            CompletableFuture<Object> getStock = CompletableFuture.anyOf(getSockFromSina,getStockFromNetease);
            getStock.thenAccept(new Consumer<Object>() {
                public void accept(Object result){
                    System.out.println("Result:"+result);
                }
            });
            getStock.join();
        }
    }
    

    在这个例子中,我们定义了两个Supplier对象,第1个是从新浪获取证券的价格,第2个是从网易获取证券价格。然后我们在main方法中,首先创建了2个CompletableFuture对象,它们分别从新浪和网易获取同一个证券的价格。紧接着,我们用CompletableFuture.anyOf()把两个CompletableFuture变成1个新的CompletableFuture对象。这个新的CompletableFuture对象会在前2个对象任意一个首先完成的时候,就调用thenAccept,这样我们打印的结果就是先返回的价格。


    把anyOf改为allOf:这样它会在2个CompletableFuture都返回以后,才会执行新的CompletableFuture。当使用allOf的时候,新的CompletableFuture的范型参数只能是Void

            CompletableFuture<Void> getStock = CompletableFuture.allOf(getSockFromSina,getStockFromNetease);
    

    3.4 CompletableFuture的命名规则:

    • xxx():继续在已有的线程中执行
    • xxxAsync():用Executor的新线程执行

    4. 总结:

    CompletableFuture对象可以指定异步处理流程:

    • thenAccept()处理正常结果
    • exceptional()处理异常结果
    • thenApplyAsync()用于串行化另一个CompletableFuture
    • anyOf/allOf用于并行化两个CompletableFuture
  • 相关阅读:
    django之admin管理工具
    django之中间件
    cookie和session
    day052-53 django框架
    day050 前端Jquery库的使用
    sprint
    Scrum 项目1.0
    【团队项目】3.0
    [读书笔记]
    【团队项目】2.0
  • 原文地址:https://www.cnblogs.com/csj2018/p/11031781.html
Copyright © 2011-2022 走看看