zoukankan      html  css  js  c++  java
  • CompletableFuture用法介绍

    一、CompletableFuture用法入门介绍

    入门介绍的一个例子:

     1 package com.cy.java8;
     2 
     3 import java.util.Random;
     4 import java.util.concurrent.CompletableFuture;
     5 
     6 public class CompletableFutureInAction {
     7     private final static Random RANDOM = new Random(System.currentTimeMillis());
     8 
     9     public static void main(String[] args){
    10         CompletableFuture<Double> completableFuture = new CompletableFuture<>();
    11         new Thread(() -> {
    12             double value = get();
    13             completableFuture.complete(value);
    14         }).start();
    15 
    16         System.out.println("do other things...");
    17 
    18         completableFuture.whenComplete((t, e) -> {
    19             System.out.println("complete. value = "+ t);
    20             if(e != null){
    21                 e.printStackTrace();
    22             }
    23         });
    24     }
    25 
    26     private static double get(){
    27         try {
    28             Thread.sleep(RANDOM.nextInt(3000));
    29         } catch (InterruptedException e) {
    30             e.printStackTrace();
    31         }
    32         return RANDOM.nextDouble();
    33     }
    34 }

    console打印:

    do other things...
    complete. value = 0.8244376567363494
    

     

    二、CompletableFuture.supplyAsync

    CompletableFuture很少有直接new出来的方式去用的,一般都是通过提供的静态方法来使用。

    1.使用CompletableFuture.supplyAsync来构造CompletableFuture:

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.*;
     4 
     5 import static com.cy.java8.CompletableFutureInAction.get;
     6 
     7 public class CompletableFutureInAction2 {
     8 
     9     public static void main(String[] args) {
    10         /**
    11          * 可以发现value=..没有被打印,为什么呢?
    12          * 因为此方法构造出来的线程是demon的,守护进程,main执行结束之后就消失了,所以
    13          * 根本没来得及执行whenComplete中的语句
    14          */
    15         CompletableFuture.supplyAsync(() -> get())
    16                 .whenComplete((v, e) -> {
    17                     System.out.println("value = " + v);
    18                     if (e != null) {
    19                         e.printStackTrace();
    20                     }
    21                 });
    22 
    23         System.out.println("do other things...");
    24     }
    25 
    26 
    27 }

    2.要将上面whenComplete中的语句执行,进行改造:

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.*;
     4 import java.util.concurrent.atomic.AtomicBoolean;
     5 import static com.cy.java8.CompletableFutureInAction.get;
     6 
     7 public class CompletableFutureInAction2 {
     8 
     9     public static void main(String[] args) throws InterruptedException {
    10         AtomicBoolean finished = new AtomicBoolean(false);
    11 
    12         CompletableFuture.supplyAsync(() -> get())
    13                 .whenComplete((v, e) -> {
    14                     System.out.println("value = " + v);
    15                     if (e != null) {
    16                         e.printStackTrace();
    17                     }
    18                     finished.set(true);
    19                 });
    20 
    21         System.out.println("do other things...");
    22 
    23         while(!finished.get()){
    24             Thread.sleep(1);
    25         }
    26     }
    27 
    28 
    29 }

    改写之后, main线程发现如果finished没有变为true就会一直等1ms,直到whenComplete执行将finished变为true。

    3.上面的改写很low,其实只要将守护线程变为前台进程,main结束后不会消失就行了。

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.*;
     4 import static com.cy.java8.CompletableFutureInAction.get;
     5 
     6 public class CompletableFutureInAction2 {
     7 
     8     public static void main(String[] args){
     9         ExecutorService executorService = Executors.newFixedThreadPool(2, r -> {
    10             Thread t = new Thread(r);
    11             t.setDaemon(false);     //非守护线程
    12             return t;
    13         });
    14 
    15         CompletableFuture.supplyAsync(() -> get(), executorService)
    16                 .whenComplete((v, e) -> {
    17                     System.out.println("value = " + v);
    18                     if (e != null) {
    19                         e.printStackTrace();
    20                     }
    21                 });
    22 
    23         System.out.println("do other things...");
    24 
    25         //main执行结束之后,executorService线程不会结束,需要手动shutdown
    26         executorService.shutdown();
    27     }
    28 
    29 
    30 }

    三、thenApply:               

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 import java.util.concurrent.ExecutorService;
     5 import java.util.concurrent.Executors;
     6 
     7 public class CompletableFutureInAction3 {
     8 
     9     public static void main(String[] args) {
    10         ExecutorService executor = Executors.newFixedThreadPool(2, r -> {
    11             Thread t = new Thread(r);
    12             t.setDaemon(false);
    13             return t;
    14         });
    15 
    16         /**
    17          * 将执行完的结果再*100
    18          */
    19         CompletableFuture.supplyAsync(CompletableFutureInAction::get, executor)
    20                             .thenApply(v -> multiply(v))
    21                             .whenComplete((v, e) -> System.out.println(v));
    22     }
    23 
    24     private static double multiply(double value){
    25         try {
    26             Thread.sleep(1000);
    27         } catch (InterruptedException e) {
    28             e.printStackTrace();
    29         }
    30         return value * 100;
    31     }
    32 
    33 }

    console打印:

    43.15351824222534
    

    四、CompletableFuture.join()  

     1 package com.cy.java8;
     2 
     3 import java.util.Arrays;
     4 import java.util.List;
     5 import java.util.concurrent.CompletableFuture;
     6 import java.util.concurrent.ExecutorService;
     7 import java.util.concurrent.Executors;
     8 import java.util.stream.Collectors;
     9 
    10 public class CompletableFutureInAction3 {
    11 
    12     public static void main(String[] args) {
    13         ExecutorService executor = Executors.newFixedThreadPool(2, r -> {
    14             Thread t = new Thread(r);
    15             t.setDaemon(false);
    16             return t;
    17         });
    18 
    19         /**
    20          * 需求:将一组商品列表里面的每个商品对应的价格查询出来,并将这个价格*100.
    21          * 5个商品同时并发去做这件事
    22          *
    23          * CompletableFuture.join():等到所有的结果都执行结束,会返回CompletableFuture自己本身
    24          * 执行完的结果,等于get()返回的结果。
    25          */
    26         List<Integer> productionIDs = Arrays.asList(1, 2, 3, 4, 5);     //待查的一组商品列表的ID
    27         List<Double> priceList = productionIDs.stream().map(id -> CompletableFuture.supplyAsync(() -> queryProduction(id), executor))
    28                                                         .map(future -> future.thenApply(price -> multiply(price)))
    29                                                         .map(multiplyFuture -> multiplyFuture.join())
    30                                                         .collect(Collectors.toList());
    31         System.out.println(priceList);
    32 
    33         /**
    34          * 按照以前,要5个分别for循环去查询
    35          * 或者分多个线程去查询,再将每个线程查询的结果汇总,等到全部线程都执行完了,结果也就出来了
    36          */
    37     }
    38 
    39     private static double multiply(double value) {
    40         try {
    41             Thread.sleep(1000);
    42         } catch (InterruptedException e) {
    43             e.printStackTrace();
    44         }
    45         return value * 100;
    46     }
    47 
    48     /**
    49      * 模拟 根据商品id查询对应的价格
    50      * @param id
    51      * @return
    52      */
    53     private static double queryProduction(int id){
    54         return CompletableFutureInAction.get();
    55     }
    56 }

    console打印:

    [90.93730009374265, 23.65282935900653, 17.415066430776815, 16.605197824452343, 60.143109082288206]

     说明:这里多个任务同时执行,最终把结果汇总到一起 ,这种都是并行去执行的,编写代码也比较简洁,不需要考虑多线程之间的一些交互、锁、多线程之间的通信、控制,都不需要去关心。

    五、CompletableFuture的常用API介绍 

    supplyAsync
    thenApply
    whenComplete
    handle
    thenRun
    thenAccept
    thenCompose
    thenCombine
    thenAcceptBoth

    runAfterBoth
    applyToEither
    acceptEither
    runAfterEither
    anyOf
    allOf

    1)supplyAsync、thenApply、whenComplete前面的代码已经介绍了。

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction4 {
     6     public static void main(String[] args) throws InterruptedException {
     7         CompletableFuture.supplyAsync(() -> 1)
     8                 .thenApply(v -> Integer.sum(v,10))
     9                 .whenComplete((v, e) -> System.out.println(v));
    10 
    11         Thread.sleep(1000);
    12     }
    13 }

    2)whenCompleteAsync:    whenComplete是同步的方式,如果对于结果的处理是比较占时间的,不想通过这种同步的方式去做,可以用whenCompleteAsync进行异步操作。

    3)handle:和thenApply差不多,只是多了一个对于异常的考虑。

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction4 {
     6     public static void main(String[] args) throws InterruptedException {
     7         CompletableFuture.supplyAsync(() -> 1)
     8                 .handle((v, e) -> Integer.sum(v, 10))
     9                 .whenComplete((v, e) -> System.out.println(v));
    10 
    11         Thread.sleep(1000);
    12     }
    13 }

    4)thenRun:如果想在completableFuture整个执行结束之后,还想进行一个操作,可以thenRun(Runnable r)

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction4 {
     6     public static void main(String[] args) throws InterruptedException {
     7         CompletableFuture.supplyAsync(() -> 1)
     8                 .handle((v, e) -> Integer.sum(v, 10))
     9                 .whenComplete((v, e) -> System.out.println(v))
    10                 .thenRun(()-> System.out.println("thenRunning..."));
    11 
    12         Thread.sleep(1000);
    13     }
    14 }
    11
    thenRunning...
    

    5)thenAccept:   thenAccept(Consumer c)里面传的是consumer,对执行结果进行消费,不会对执行结果进行任何操作。  

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction4 {
     6     public static void main(String[] args) throws InterruptedException {
     7         CompletableFuture.supplyAsync(() -> 1)
     8                         .thenApply(v -> Integer.sum(v, 10))
     9                         .thenAccept(System.out::println);
    10 
    11         Thread.sleep(1000);
    12     }
    13 }
    11
    

    6)thenCompose: 对执行结果再交给另外一个CompletableFuture,它再去对这个执行结果进行另外的计算。compose:组合,组合设计模式。

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction4 {
     6     public static void main(String[] args) throws InterruptedException {
     7         CompletableFuture.supplyAsync(() -> 1)
     8                         .thenCompose(v -> CompletableFuture.supplyAsync(() -> v * 10))
     9                         .thenAccept(System.out::println);
    10         
    11         Thread.sleep(1000);
    12     }
    13 }
    10
    

    7)thenCombine: thenCombine(CompletableFuture extends CompletionStage, BiFuntion)

              CompletableFuture的计算结果v1作为BiFunction的第1个入参,thenCombine中的第一个参数CompletableFuture的计算结果v2作为BiFunction的第2个入参,biFunction进行操作然后返回结果。

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction4 {
     6     public static void main(String[] args) throws InterruptedException {
     7         CompletableFuture.supplyAsync(() -> 1)
     8                         .thenCombine(CompletableFuture.supplyAsync(() -> 2.0), (v1, v2) -> v1 + v2)
     9                         .thenAccept(System.out::println);
    10         
    11         Thread.sleep(1000);
    12     }
    13 }
    3.0
    

    8)thenAcceptBoth: 和thenCombine差不多,只不过它的第二个参数是BiConsumer 

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction4 {
     6     public static void main(String[] args) throws InterruptedException {
     7         CompletableFuture.supplyAsync(() -> 1)
     8                         .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2.0), (v1, v2) -> {
     9                             System.out.println("value=" + (v1 + v2));
    10                         });
    11 
    12         Thread.sleep(1000);
    13     }
    14 }
    value=3.0
    

    9)runAfterBoth:两个CompletableFuture都执行结束之后,run

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction5 {
     6     public static void main(String[] args) throws InterruptedException {
     7 
     8         CompletableFuture.supplyAsync(() -> {
     9             System.out.println(Thread.currentThread().getName() + " is running");
    10             return 1;
    11         }).runAfterBoth(CompletableFuture.supplyAsync(() -> {
    12             System.out.println(Thread.currentThread().getName() + " is running too");
    13             return 2;
    14         }), () -> System.out.println("both done"));
    15 
    16         Thread.sleep(1000);
    17     }
    18 }
    ForkJoinPool.commonPool-worker-9 is running
    ForkJoinPool.commonPool-worker-9 is running too
    both done
    

      

    10)applyToEither

    applyToEither:两个CompletableFuture只要有1个执行完了,就将这个CompletableFuture交给Function。谁先执行完就将谁交给Function去执行  

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction5 {
     6     public static void main(String[] args) throws InterruptedException {
     7 
     8         CompletableFuture.supplyAsync(() -> {
     9             try {
    10                 Thread.sleep(900);
    11             } catch (InterruptedException e) {
    12                 e.printStackTrace();
    13             }
    14             System.out.println("I am future 1");
    15             return 1;
    16         }).applyToEither(CompletableFuture.supplyAsync(() -> {
    17             try {
    18                 Thread.sleep(50);
    19             } catch (InterruptedException e) {
    20                 e.printStackTrace();
    21             }
    22             System.out.println("I am future 2");
    23             return 2;
    24         }), v -> {
    25             System.out.println("value = " + v);
    26             return v * 10;
    27         }).thenAccept(System.out::println);
    28 
    29 
    30         Thread.currentThread().join();
    31     }
    32 }
    I am future 2
    value = 2
    20
    I am future 1
    

      

    11)acceptEither 

    acceptEither:acceptEither(CompletableFuture extends CompletionStage, Consumer), 两个CompletableFuture谁先执行完成,就将谁的结果交给consumer执行。

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction5 {
     6     public static void main(String[] args) throws InterruptedException {
     7 
     8         CompletableFuture.supplyAsync(() -> {
     9             try {
    10                 Thread.sleep(900);
    11             } catch (InterruptedException e) {
    12                 e.printStackTrace();
    13             }
    14             System.out.println("I am future 1");
    15             return 1;
    16         }).acceptEither(CompletableFuture.supplyAsync(() -> {
    17             try {
    18                 Thread.sleep(50);
    19             } catch (InterruptedException e) {
    20                 e.printStackTrace();
    21             }
    22             System.out.println("I am future 2");
    23             return 2;
    24         }), v -> System.out.println("value = " + v));
    25 
    26         Thread.currentThread().join();
    27     }
    28 }
    I am future 2
    value = 2
    I am future 1
    

      

    12)runAfterEither 

     runAfterEither: runAfterEither(CompletionStage, Runnable),只要有一个CompletableFuture执行完了,就执行run

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.CompletableFuture;
     4 
     5 public class CompletableFutureInAction5 {
     6     public static void main(String[] args) throws InterruptedException {
     7 
     8         CompletableFuture.supplyAsync(() -> {
     9             try {
    10                 Thread.sleep(900);
    11             } catch (InterruptedException e) {
    12                 e.printStackTrace();
    13             }
    14             System.out.println("I am future 1");
    15             return 1;
    16         }).runAfterEither(CompletableFuture.supplyAsync(() -> {
    17             try {
    18                 Thread.sleep(50);
    19             } catch (InterruptedException e) {
    20                 e.printStackTrace();
    21             }
    22             System.out.println("I am future 2");
    23             return 2;
    24         }), () -> System.out.println("done."));
    25 
    26         Thread.currentThread().join();
    27     }
    28 }
    I am future 2
    done.
    I am future 1
    

      

    13)allOf

    allOf(CompletableFuture<?>... cfs),返回值是CompletableFuture<Void>。要等所有的CompletableFuture都执行完成,才能执行下一步动作。

     1 package com.cy.java8;
     2 
     3 import java.util.Arrays;
     4 import java.util.List;
     5 import java.util.Random;
     6 import java.util.concurrent.CompletableFuture;
     7 import java.util.stream.Collectors;
     8 
     9 public class CompletableFutureInAction5 {
    10     private final static Random RANDOM = new Random(System.currentTimeMillis());
    11 
    12     public static void main(String[] args) throws InterruptedException {
    13         List<CompletableFuture<Double>> list = Arrays.asList(1, 2, 3, 4).stream()
    14                 .map(i -> CompletableFuture.supplyAsync(CompletableFutureInAction5::get))
    15                 .collect(Collectors.toList());
    16 
    17         //要等所有的CompletableFuture这些task执行完了,才会打印done.
    18         CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]))
    19                 .thenRun(() -> System.out.println("done."));
    20 
    21         Thread.currentThread().join();
    22     }
    23 
    24     static double get() {
    25         try {
    26             Thread.sleep(RANDOM.nextInt(3000));
    27         } catch (InterruptedException e) {
    28             e.printStackTrace();
    29         }
    30         double result = RANDOM.nextDouble();
    31         System.out.println(result);
    32         return result;
    33     }
    34 }
    0.6446554001163166
    0.24435437709196395
    0.18251850071600362
    0.5261702037394511
    done.
    

      

    14)anyOf

    和allOf相反,只要有一个CompletableFuture执行完成,就会执行下一步动作

     1 package com.cy.java8;
     2 
     3 import java.util.Arrays;
     4 import java.util.List;
     5 import java.util.Random;
     6 import java.util.concurrent.CompletableFuture;
     7 import java.util.stream.Collectors;
     8 
     9 public class CompletableFutureInAction5 {
    10     private final static Random RANDOM = new Random(System.currentTimeMillis());
    11 
    12     public static void main(String[] args) throws InterruptedException {
    13         List<CompletableFuture<Double>> list = Arrays.asList(1, 2, 3, 4).stream()
    14                 .map(i -> CompletableFuture.supplyAsync(CompletableFutureInAction5::get))
    15                 .collect(Collectors.toList());
    16 
    17         //只要有一个CompletableFuture执行完了,就会打印done.
    18         CompletableFuture.anyOf(list.toArray(new CompletableFuture[list.size()]))
    19                 .thenRun(() -> System.out.println("done."));
    20 
    21         Thread.currentThread().join();
    22     }
    23 
    24     static double get() {
    25         try {
    26             Thread.sleep(RANDOM.nextInt(3000));
    27         } catch (InterruptedException e) {
    28             e.printStackTrace();
    29         }
    30         double result = RANDOM.nextDouble();
    31         System.out.println(result);
    32         return result;
    33     }
    34 }
    0.1334361442807943
    done.
    0.6715112881360222
    0.12945359790698785
    0.1307762755130788
    

      

    ----

  • 相关阅读:
    STL中set求交集、并集、差集的方法
    Vijos 1308 埃及分数(迭代加深搜索)
    POJ 1161 Walls(Floyd , 建图)
    UVa 1601 万圣节后的早晨
    dp之完全背包
    dp之取数字问题
    dp之最长公共子序列
    枚举排列
    poj 3187 暴力枚举
    poj 2431 优先队列,贪心
  • 原文地址:https://www.cnblogs.com/tenWood/p/11614336.html
Copyright © 2011-2022 走看看