zoukankan      html  css  js  c++  java
  • JAVA8之CompletableFuture(组合式异步编程)与Future

    Future

      Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。要使用Future,通只需要将耗时操作封装在一个Callable对象中,再将它提交给ExecutorService。

      ExecutorService(线程池)体系结构:

    一、线程池: 提供一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁的额外开销,提高了响应的速度。
    
    二、线程池的体系结构:
    java.util.concurrent.Executor 负责线程的使用和调度的根接口
    		|--ExecutorService 子接口: 线程池的主要接口
    				|--ThreadPoolExecutor 线程池的实现类
    				|--ScheduledExceutorService 子接口: 负责线程的调度
    					|--ScheduledThreadPoolExecutor : 继承ThreadPoolExecutor,实现了ScheduledExecutorService
    			
    三、工具类 : Executors
    ExecutorService newFixedThreadPool() : 创建固定大小的线程池
    ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
    ExecutorService newSingleThreadExecutor() : 创建单个线程池。 线程池中只有一个线程
    ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务

      示例代码

    package completableFuture;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class FutureTest {
        public static void main(String[] args) {
            ExecutorService es = Executors.newFixedThreadPool(1);
            Future<Integer> f = es.submit(() -> {
                Thread.sleep(10000);
                // 结果
                return 100;
            });
            // do something
            Integer result = 0;
            try {
                result = f.get(1l, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (TimeoutException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(result);
        }
    }
    View Code

      Future接口的局限性:Future很难直接表述多个Future 结果之间的依赖性

    CompletableFuture

      创建CompletableFuture

      a.实例化生成 CompletableFuture

      此时这个future和Callable没有任何联系,没有线程池也不是异步工作。如果现在客户端代码调用ask().get()它将永远阻塞。通过complete方法触发完成

      示例代码如下:

    package completableFuture;
    
    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureTest {
        public static void main(String[] args) {
            Long start = System.currentTimeMillis();
            CompletableFuture<Double> future = new CompletableFuture<Double>();
            new Thread(() -> {
                try {
                    Thread.sleep(1000l);
                    double price = 10d;
                    future.complete(price);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }).start();
            Double result = future.join();
            Long end = System.currentTimeMillis();
            System.out.println((end - start) + " ms:" + result);
        }
    }
    View Code

      b.使用工厂方法创建 CompletableFuture

    public static CompletableFuture<Void>   runAsync(Runnable runnable)
    public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
    public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)
      supplyAsyncrunAsync不同在与前者异步返回一个结果,后者是void. Executor表示是用自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()作为它的线程池.线程数是Runtime.getRuntime().availableProcessors(). 其中Supplier是一个函数式接口(函数描述符为 () -> T).
      结果执行完成时的处理
    public CompletableFuture<T>     whenComplete( <? super T,? super Throwable> action)
    public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
    public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
    public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

      BiConsumer的函数描述符为(T,U) -> void ,无返回值.方法以Async结尾,意味着Action使用相同的线程执行(如果使用相同的线程池,也可能会被同一个线程选中执行).

      
      多个异步任务进行流水线操作
      a.结果转换(thenApply):输入是上一个阶段计算后的结果,返回值是经过转化后结果 .Function函数描述符: T -> R
    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
      b.消费结果(thenAccept):只是针对结果进行消费,入参是Consumer,没有返回值. Consumer函数描述符: T -> void
    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

      c.结果合并(thenCombine):需要上一阶段的返回值,并且other代表的CompletionStage也要返回值之后,把这两个返回值,进行转换后返回指定类型的值

    public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

     代码示例如下:

    package completableFuture;
    
    import java.util.concurrent.CompletableFuture;
    
    public class Test {
    	public static void main(String[] args) {
    		testThenApply();
    		testThenCompose();
    		testThenCombine();
    		testThenAcceptBoth();
    	}
    	
    	private static void testThenApply() {
    		// 任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
    		CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> {
    			System.out.println(resultA + " resultB");
    			return resultA + " resultB";
    		});
    	}
    
    	private static void testThenCompose() {
    		//
    		CompletableFuture.supplyAsync(() -> "resultA").thenCompose(resultA -> {
    			return CompletableFuture.supplyAsync(() -> resultA + "resultB");
    		});
    	}
    	
    	private static void testThenCombine() {
    		// thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理
    		CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
    		CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");
    		CompletableFuture<String> result = future1.thenCombine(future2, (f1, f2) -> f1 + f2);
    		System.out.println(result.join());
    	}
    
    	private static void testThenAcceptBoth() {
    		// 接受任务的处理结果,并消费处理,有返回结果
    		CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
    		CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");
    		future1.thenAcceptBoth(future2, (f1, f2) -> {
    			System.out.println("accept:" + f1 + f2);
    		});
    	}
    }
    View Code

     

      

  • 相关阅读:
    OpenVINO Model Server的服务化部署——step3(django服务构建)
    (5)名称空间 namespace 和 using 声明
    (4)#include 指令
    (3)注释
    (2)简单的程序
    (1)Hello World
    javaScript 错误学习 -- throw、try 、catch和 finally
    js 如何在数字前面自动补零,生成序列号、单据号
    vs2015项目运行出现“无法启动IIS Express Web服务器”,如何解决
    Sql Server 2008 如何将数据表导出Excel文件?
  • 原文地址:https://www.cnblogs.com/ryjJava/p/12333426.html
Copyright © 2011-2022 走看看