zoukankan      html  css  js  c++  java
  • JAVA8之CompletableFuture(组合式异步编程)与Future

    Future

      Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。要使用Future,通只需要将耗时操作封装在一个Callable对象中,再将它提交给ExecutorService。

      ExecutorService(线程池)体系结构:

    一、线程池: 提供一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁的额外开销,提高了响应的速度。
    
    二、线程池的体系结构:
    java.util.concurrent.Executor 负责线程的使用和调度的根接口
    		|--ExecutorService 子接口: 线程池的主要接口
    				|--ThreadPoolExecutor 线程池的实现类
    				|--ScheduledExceutorService 子接口: 负责线程的调度
    					|--ScheduledThreadPoolExecutor : 继承ThreadPoolExecutor,实现了ScheduledExecutorService
    			
    三、工具类 : Executors
    ExecutorService newFixedThreadPool() : 创建固定大小的线程池
    ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
    ExecutorService newSingleThreadExecutor() : 创建单个线程池。 线程池中只有一个线程
    ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务

      示例代码

    package completableFuture;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class FutureTest {
        public static void main(String[] args) {
            ExecutorService es = Executors.newFixedThreadPool(1);
            Future<Integer> f = es.submit(() -> {
                Thread.sleep(10000);
                // 结果
                return 100;
            });
            // do something
            Integer result = 0;
            try {
                result = f.get(1l, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (TimeoutException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(result);
        }
    }
    View Code

      Future接口的局限性:Future很难直接表述多个Future 结果之间的依赖性

    CompletableFuture

      创建CompletableFuture

      a.实例化生成 CompletableFuture

      此时这个future和Callable没有任何联系,没有线程池也不是异步工作。如果现在客户端代码调用ask().get()它将永远阻塞。通过complete方法触发完成

      示例代码如下:

    package completableFuture;
    
    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureTest {
        public static void main(String[] args) {
            Long start = System.currentTimeMillis();
            CompletableFuture<Double> future = new CompletableFuture<Double>();
            new Thread(() -> {
                try {
                    Thread.sleep(1000l);
                    double price = 10d;
                    future.complete(price);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }).start();
            Double result = future.join();
            Long end = System.currentTimeMillis();
            System.out.println((end - start) + " ms:" + result);
        }
    }
    View Code

      b.使用工厂方法创建 CompletableFuture

    public static CompletableFuture<Void>   runAsync(Runnable runnable)
    public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
    public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)
      supplyAsyncrunAsync不同在与前者异步返回一个结果,后者是void. Executor表示是用自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()作为它的线程池.线程数是Runtime.getRuntime().availableProcessors(). 其中Supplier是一个函数式接口(函数描述符为 () -> T).
      结果执行完成时的处理
    public CompletableFuture<T>     whenComplete( <? super T,? super Throwable> action)
    public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
    public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
    public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

      BiConsumer的函数描述符为(T,U) -> void ,无返回值.方法以Async结尾,意味着Action使用相同的线程执行(如果使用相同的线程池,也可能会被同一个线程选中执行).

      
      多个异步任务进行流水线操作
      a.结果转换(thenApply):输入是上一个阶段计算后的结果,返回值是经过转化后结果 .Function函数描述符: T -> R
    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
      b.消费结果(thenAccept):只是针对结果进行消费,入参是Consumer,没有返回值. Consumer函数描述符: T -> void
    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

      c.结果合并(thenCombine):需要上一阶段的返回值,并且other代表的CompletionStage也要返回值之后,把这两个返回值,进行转换后返回指定类型的值

    public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

     代码示例如下:

    package completableFuture;
    
    import java.util.concurrent.CompletableFuture;
    
    public class Test {
    	public static void main(String[] args) {
    		testThenApply();
    		testThenCompose();
    		testThenCombine();
    		testThenAcceptBoth();
    	}
    	
    	private static void testThenApply() {
    		// 任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
    		CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> {
    			System.out.println(resultA + " resultB");
    			return resultA + " resultB";
    		});
    	}
    
    	private static void testThenCompose() {
    		//
    		CompletableFuture.supplyAsync(() -> "resultA").thenCompose(resultA -> {
    			return CompletableFuture.supplyAsync(() -> resultA + "resultB");
    		});
    	}
    	
    	private static void testThenCombine() {
    		// thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理
    		CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
    		CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");
    		CompletableFuture<String> result = future1.thenCombine(future2, (f1, f2) -> f1 + f2);
    		System.out.println(result.join());
    	}
    
    	private static void testThenAcceptBoth() {
    		// 接受任务的处理结果,并消费处理,有返回结果
    		CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
    		CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");
    		future1.thenAcceptBoth(future2, (f1, f2) -> {
    			System.out.println("accept:" + f1 + f2);
    		});
    	}
    }
    View Code

     

      

  • 相关阅读:
    Constants and Variables
    随想
    C#基础篇之语言和框架介绍
    Python基础19 实例方法 类方法 静态方法 私有变量 私有方法 属性
    Python基础18 实例变量 类变量 构造方法
    Python基础17 嵌套函数 函数类型和Lambda表达式 三大基础函数 filter() map() reduce()
    Python基础16 函数返回值 作用区域 生成器
    Python基础11 List插入,删除,替换和其他常用方法 insert() remove() pop() reverse() copy() clear() index() count()
    Python基础15 函数的定义 使用关键字参数调用 参数默认值 可变参数
    Python基础14 字典的创建修改访问和遍历 popitem() keys() values() items()
  • 原文地址:https://www.cnblogs.com/ryjJava/p/12333426.html
Copyright © 2011-2022 走看看