zoukankan      html  css  js  c++  java
  • 【并发编程】6.线程控制工具类

    1.Future、Callable、FutureTask

    线程池提供的submit方法

    <T> Future<T> submit(Callable<T> task); //Callable  call方法具有返回值,Future对象可以通过调用其get()方法来获 取任务的执行结果。
    
    <T> Future<T> submit(Runnable task, T result);  //需要你注意的是Runnable接口的实现类Task声明了一个有 参构造函数 Task(Result r) ,创建Task对象的时候传入了result对象,这样就能在类Task的run()方法 中对result进行各种操作了。result相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
    
    Future<?> submit(Runnable task);    //Runnable run方法没有返回值  返回的Future仅用于判断执行是否完成
    

    1.1Callable
    在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable接口,就可以让线程池去执行。但是使用Runable接口无法获取任务执行的结果。
    于是Java提供了Callable接口,相比于Runable增加了返回值,并且Callable接口是一个泛型接口,可以返回指定类型的结果。

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    将Callable 最为线程池执行任务的参数 通过返回值Future就可以获得call方法的实现中返回的结果值:

       public static void main(String[] args) throws Exception{
             //初始化线程池,实际开发中不建议这么做
            ExecutorService threadPool =  Executors.newFixedThreadPool(2);
            Future reuslt = threadPool.submit(new Callable<Integer>() {
                //匿名内部类实现Callable接口
                @Override
                public Integer call() throws Exception {
                    System.out.println("callable task run");
                    return 10;
                }
            });
            //或者执行结果 会阻塞
            Integer num = (Integer) reuslt.get();
            System.out.println(num);
            threadPool.shutdown();
        }
    
    //执行结果  
    //callable task run
    //10
    

    1.2Future
    Future作为线程池的执行的返回值,提供了以下的方法

        boolean cancel(boolean mayInterruptIfRunning);   //取消任务的方法
        boolean isCancelled(); //判断任务是否已取消
        boolean isDone(); //判断任务是否已结束
        V get() throws InterruptedException, ExecutionException; //任务执行结果  当前线程阻塞
        V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; //任务执行结果 超时机制  当前线程阻塞
    

    1.3FutureTask
    FutureTask实现了Runnable和Future接口,由于实现了Runnable 接口,所以可以将FutureTask对象作为任务提交给ThreadPoolExecutor去执行,也可以直接被Thread执行;
    又因为实现了Future接口,所以也能用来获得任务的执行结果。

     //接口关系
    public class FutureTask<V> implements RunnableFuture<V>
    
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();//接口的多继承
      }
    
    //构造方法
    FutureTask(Callable callable);
    FutureTask(Runnable runnable, V result);
    
    //线程池执行FutureTask 并获取结果
    public static void main(String[] args) throws Exception{
            // 创建
            FutureTask futureTask  = new FutureTask<>(()-> 1+2);
            // 创建线程池
            ExecutorService es = Executors.newCachedThreadPool();
            // 提交
            Future future = es.submit(futureTask);
             // 获取计算结果
            Integer result = (Integer) futureTask.get();
            System.out.println(result);
            es.shutdown();
        }
    
    //直接由线程执行并且获取结果
    public static void main(String[] args) throws Exception{
            // 创建
            FutureTask futureTask  = new FutureTask<>(()-> 1+2);
            Thread th1 = new Thread(futureTask);
            th1.start();
            Integer result =(Integer) futureTask.get();
            System.out.println(result);
        }
    

    2.CountDownLatch

    CountDownLatch是Java 1.5提供的线程控制工具类,主要用来解决一个线程等待 多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点.

        //构造方法 指定计数器
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        //计数器 -1 
        countDownLatch.countDown();//完成后计数器-1
    
        ///等待计数器为0
        countDownLatch.await();
    

    countDown()方法一般用于在线程池的任务中,注意进行异常捕获,不然在await()时可能会造成一直阻塞。

        ExecutorService executorService =Executors.newFixedThreadPool(3);
        CountDownLatch cdl = new CountDownLatch(3);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    function1();
                } catch (Exception e) {
                    //异常处理
                    e.printStackTrace();
                }
                finally {
                    cdl.countDown();
                }
            }
        });  
    

    3.CyclicBarrier

    CyclicBarrier 也是Java 1.5中提供的线程同步控制类,CyclicBarrier字面意思是“可重复使用的栅栏”,使用CyclicBarrier可以实现一组线程相互的等待。
    在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理。

    //构造器  指定在所有线程都完成,即计数器为0时执行的操作 barrierAction
    public CyclicBarrier(int parties, Runnable barrierAction) {
      if (parties <= 0) throw new IllegalArgumentException();
      this.parties = parties;
      this.count = parties;
      this.barrierCommand = barrierAction;
    }
     
    //构造器
    public CyclicBarrier(int parties) {
      this(parties, null);
    
    public int await() throws InterruptedException, BrokenBarrierException
    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
    
    • 线程调用 await() 表示自己已经到达栅栏,当前线程阻塞,等待其他线程到达栅栏
    • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
    • InterruptedException,其他线程中断了这个线程就会抛出该异常,其他的线程就会抛出BrokenBarrierException
    public static void main(String[] args) throws Exception{
            CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()-> System.out.println("所有运动员准备就绪"));
            ExecutorService executors = Executors.newFixedThreadPool(10);
            for(int i=0;i<10;i++){
                executors.submit(()->{
                    try {
                        Thread.sleep(100);
                        System.out.println("我准备好了");
                        cyclicBarrier.await();//到达栅栏 阻塞 等待其他线程到达
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }catch (BrokenBarrierException e){
                        e.printStackTrace();
                    }
                    System.out.println("起飞!");
                });
            }
            executors.shutdown();
        }
    

    demo代码中栅栏的计数器为5,循环线程数为10,当5个一组进行完之后,计数器会重置开启下一轮。

    4.CompletableFuture

    异步化是利用多线程进行程序性能优化的基础

    4.1 Java中实现异步的方法:

    • 1.异步调用 调用方创建一个子线程,在子线程中执行方法调用
    • 2.异步方法 方法实现时,创建新线程执行主要的逻辑,主线程直接返回结果

    异步转同步的方法:主线程阻塞直到子线程执行完成。

    Java在1.8版本提供了CompletableFuture来支持异步编程,实现非常复杂,但是功能也很强大。

    CompletableFuture 优点

    1. 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
    2. 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务3要等待任务1和任 务2都完成后才能开始”;
    3. 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
    /**
    * 实现Future 接口  可以获得执行结果
    * CompletionStage 异步流程中的串行,汇聚
    */
    
    public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
               
           //判断线程池的并行级别是否大于1  并行级别决定了同一时刻最多有多少个线程在执行,如不传如并行级别参数,将默认为当前系统的CPU个数。
           private static final boolean useCommonPool =
            (ForkJoinPool.getCommonPoolParallelism() > 1);
    
           //默认的线程池声明
           private static final Executor asyncPool = useCommonPool ?
            ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    
          //创建CompletableFuture的静态方法
          //使用默认的线程池
          static > CompletableFuture runAsync(Runnable runnable) 
          static CompletableFuture supplyAsync(Supplier supplier)
    
           //可以指定线程池 
          static CompletableFuture runAsync(Runnable runnable, Executor executor) 
          static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
                
          //Runnable 接口的run()方法没有返回值,
          //而Supplier接口的get()方法是有 返回值的  是函数式编程的接口 
          //创建完CompletableFuture对象之后,会自动地异步执行runnable.run()方法或者supplier.get()方法
          //如果所有的CompletableFuture 都使用同一个线程池,当进行IO这种耗时的操作时,就会造成长时间的等待,造成线程饥饿,
    根据不同的业务类型创建不同的线程池,以避免互相干扰。
    }
    
    

    4.2 异步流程控制相关方法

    4.2.1 串行

        public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
            return uniApplyStage(null, fn);
        }
        //thenApply既能接收参数也支持返回值,所以thenApply系列方法返回的是 ? extends U ,U为CompletableFuture
        //thenAccept、  虽然支持参数,但却不支持回值
        //thenRun、 参数是Runnable,所以action既不能接收参数也不支持返回值
        //thenCompose  个系列的方法会新创建出一个子流程,最终结果和thenApply系列是相同的。
        
        CompletionStage thenApply(fn); 
        CompletionStage thenApplyAsync(fn); 
        CompletionStage thenAccept(consumer); 
        CompletionStage thenAcceptAsync(consumer);
        CompletionStage thenRun(action);
        CompletionStage thenRunAsync(action);
        CompletionStage thenCompose(fn); 
        CompletionStage thenComposeAsync(fn);
        //Async代表的是异步执行fn、consumer或者action
        
       //首先通过supplyAsync()启动一个异步 流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是 串行执行的,②依赖①的执行结果,③依赖②的执行结果。
          CompletableFuture f0 = CompletableFuture.supplyAsync( () -> "Hello World") //1
          .thenApply(s -> s + " QQ") //2
          .thenApply(String::toUpperCase);//3
          System.out.println(f0.join()); 
          //输出结果 HELLO WORLD QQ
    
       
        
    

    4.2.2.汇聚
    AND 是thenCombine、thenAcceptBoth和runAfterBoth 系列方法,参数分别是 接口的区别也是源自fn、consumer、action这三个核心参数不同

    CompletionStage thenCombine(other, fn); 
    CompletionStage thenCombineAsync(other, fn);
    CompletionStage thenAcceptBoth(other, consumer); 
    CompletionStage thenAcceptBothAsync(other, consumer);
    CompletionStage runAfterBoth(other, action);
    CompletionStage runAfterBothAsync(other, action);
    

    OR 主要是applyToEither、acceptEither和runAfterEither系列的 接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。

    CompletionStage applyToEither(other, fn); 
    CompletionStage applyToEitherAsync(other, fn); 
    CompletionStage acceptEither(other, consumer);
     CompletionStage acceptEitherAsync(other, consumer);
     CompletionStage runAfterEither(other, action);
     CompletionStage runAfterEitherAsync(other, action);
    
    //两者任一完成即可
    CompletableFuture f1 = CompletableFuture.supplyAsync(()->{
         int t = getRandom(5, 10);
         sleep(t, TimeUnit.SECONDS);
         return String.valueOf(t); 
    });
    
     CompletableFuture f2 = CompletableFuture.supplyAsync(()->{
         int t = getRandom(5, 10);
         sleep(t, TimeUnit.SECONDS);
         return String.valueOf(t); 
    }); 
    CompletableFuture f3 = f1.applyToEither(f2,s -> s); 
    System.out.println(f3.join());
    

    4.2.3、异常处理
    fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它 们抛出运行时异常。

    CompletionStage exceptionally(fn);
    CompletionStage whenComplete(consumer);
    CompletionStage whenCompleteAsync(consumer);
    CompletionStage handle(fn);
    CompletionStage handleAsync(fn);
    

    exceptionally()的使用非常类似于 try{}catch{}中的catch{},
    但是由于支持链式编程方式,所以相对更简单。既然有try{}catch{},那就一定还 有try{}finally{},

    whenComplete()handle()系列方法就类似于try{}finally{}中的finally{},
    无论是否发生异 常都会执行whenComplete()中的回调函数consumer和handle()中的回调函数fn
    whenComplete()和 handle()的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。

    exceptionally捕获异常

    CompletableFuture f0 = CompletableFuture .supplyAsync(()->7/0)) 
    .thenApply(r->r*10)
    .exceptionally(e->0);
    System.out.println(f0.join());
    

    除此之外还有CompletionService提供批量的异步操作,ForkJoinPool、ForkJoinTask提供分治任务。

    PS:函数式编程

    函数式接口(Functional Interface)
    就是一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。
    函数式接口可以被隐式转换为 lambda 表达式。
    Lambda 表达式和方法引用(实际上也可认为是Lambda表达式)上。

    如定义了一个函数式接口如下:

    @FunctionalInterface GreetingService
    {
    void sayMessage(String message);}
    

    那么就可以使用Lambda表达式来表示该接口的一个实现(注:JAVA 8 之前一般是用匿名类实现的):

    GreetingService greetService1 = message -> System.out.println("Hello " + message);
    

    Function<T,R> 接口

    FunctionalInterface
    public interface Function<T, R> {
           // 输入参数T,经过该函数处理得到R。也就是将参数应用于这个函数。
           R apply(T t);
    
    
          // 此函数有两个参数先将传进来的这个参数传入到before这个函数里面进行处理一下,然后before这个函数的返回结果再作为一个参数传递给外面的这个函数。
          default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
          Objects.requireNonNull(before);
                return (V v) -> apply(before.apply(v));
          }
    
    
          // 此函数有两个参数先将传进来的这个参数传入到apply这个函数里面进行处理一下,然后这个函数的返回结果再作为一个参数传递给外面after的这个函数。
          default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
                Objects.requireNonNull(after);
                return (T t) -> after.apply(apply(t));
          }
    
    
          static <T> Function<T, T> identity() {
              return t -> t;
          }
    }
    

    Function 就是一个函数,T代表输入参数,R代表返回的结果。其作用类似于数学中函数的定义y=f(x),(x,y)跟<T,R>的作用几乎一致。
    所以Function中没有具体的操作,具体的操作需要我们去为它指定,因此apply具体返回的结果取决于传入的lambda表达式。

          public void test(){
                Function<Integer,Integer> test=i->i+1;
                test.apply(5);
          }
    /** print:6*/
    

    Java.util.function 包中的类
    https://www.runoob.com/java/java8-functional-interfaces.html

  • 相关阅读:
    DB2 for Z/os Statement prepare
    Foreign key (referential) constraints on DB2 LUW v105
    复制Informational constraints on LUW DB2 v105
    DB2 SQL Mixed data in character strings
    DB2 create partitioned table
    MVC中使用EF的技巧集(一)
    Asp.Net MVC 开发技巧(二)
    Linq使用技巧及查询示例(一)
    Asp.Net MVC 开发技巧(一)
    Asp.Net MVC Identity 2.2.1 使用技巧(八)
  • 原文地址:https://www.cnblogs.com/shinyrou/p/13451556.html
Copyright © 2011-2022 走看看