zoukankan      html  css  js  c++  java
  • 【并发编程】6.线程控制工具类

    1.Future、Callable、FutureTask

    线程池提供的submit方法

    <T> Future<T> submit(Callable<T> task); //Callable  call方法具有返回值,Future对象可以通过调用其get()方法来获 取任务的执行结果。
    
    <T> Future<T> submit(Runnable task, T result);  //需要你注意的是Runnable接口的实现类Task声明了一个有 参构造函数 Task(Result r) ,创建Task对象的时候传入了result对象,这样就能在类Task的run()方法 中对result进行各种操作了。result相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
    
    Future<?> submit(Runnable task);    //Runnable run方法没有返回值  返回的Future仅用于判断执行是否完成
    

    1.1Callable
    在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable接口,就可以让线程池去执行。但是使用Runable接口无法获取任务执行的结果。
    于是Java提供了Callable接口,相比于Runable增加了返回值,并且Callable接口是一个泛型接口,可以返回指定类型的结果。

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    将Callable 最为线程池执行任务的参数 通过返回值Future就可以获得call方法的实现中返回的结果值:

       public static void main(String[] args) throws Exception{
             //初始化线程池,实际开发中不建议这么做
            ExecutorService threadPool =  Executors.newFixedThreadPool(2);
            Future reuslt = threadPool.submit(new Callable<Integer>() {
                //匿名内部类实现Callable接口
                @Override
                public Integer call() throws Exception {
                    System.out.println("callable task run");
                    return 10;
                }
            });
            //或者执行结果 会阻塞
            Integer num = (Integer) reuslt.get();
            System.out.println(num);
            threadPool.shutdown();
        }
    
    //执行结果  
    //callable task run
    //10
    

    1.2Future
    Future作为线程池的执行的返回值,提供了以下的方法

        boolean cancel(boolean mayInterruptIfRunning);   //取消任务的方法
        boolean isCancelled(); //判断任务是否已取消
        boolean isDone(); //判断任务是否已结束
        V get() throws InterruptedException, ExecutionException; //任务执行结果  当前线程阻塞
        V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; //任务执行结果 超时机制  当前线程阻塞
    

    1.3FutureTask
    FutureTask实现了Runnable和Future接口,由于实现了Runnable 接口,所以可以将FutureTask对象作为任务提交给ThreadPoolExecutor去执行,也可以直接被Thread执行;
    又因为实现了Future接口,所以也能用来获得任务的执行结果。

     //接口关系
    public class FutureTask<V> implements RunnableFuture<V>
    
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();//接口的多继承
      }
    
    //构造方法
    FutureTask(Callable callable);
    FutureTask(Runnable runnable, V result);
    
    //线程池执行FutureTask 并获取结果
    public static void main(String[] args) throws Exception{
            // 创建
            FutureTask futureTask  = new FutureTask<>(()-> 1+2);
            // 创建线程池
            ExecutorService es = Executors.newCachedThreadPool();
            // 提交
            Future future = es.submit(futureTask);
             // 获取计算结果
            Integer result = (Integer) futureTask.get();
            System.out.println(result);
            es.shutdown();
        }
    
    //直接由线程执行并且获取结果
    public static void main(String[] args) throws Exception{
            // 创建
            FutureTask futureTask  = new FutureTask<>(()-> 1+2);
            Thread th1 = new Thread(futureTask);
            th1.start();
            Integer result =(Integer) futureTask.get();
            System.out.println(result);
        }
    

    2.CountDownLatch

    CountDownLatch是Java 1.5提供的线程控制工具类,主要用来解决一个线程等待 多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点.

        //构造方法 指定计数器
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        //计数器 -1 
        countDownLatch.countDown();//完成后计数器-1
    
        ///等待计数器为0
        countDownLatch.await();
    

    countDown()方法一般用于在线程池的任务中,注意进行异常捕获,不然在await()时可能会造成一直阻塞。

        ExecutorService executorService =Executors.newFixedThreadPool(3);
        CountDownLatch cdl = new CountDownLatch(3);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    function1();
                } catch (Exception e) {
                    //异常处理
                    e.printStackTrace();
                }
                finally {
                    cdl.countDown();
                }
            }
        });  
    

    3.CyclicBarrier

    CyclicBarrier 也是Java 1.5中提供的线程同步控制类,CyclicBarrier字面意思是“可重复使用的栅栏”,使用CyclicBarrier可以实现一组线程相互的等待。
    在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理。

    //构造器  指定在所有线程都完成,即计数器为0时执行的操作 barrierAction
    public CyclicBarrier(int parties, Runnable barrierAction) {
      if (parties <= 0) throw new IllegalArgumentException();
      this.parties = parties;
      this.count = parties;
      this.barrierCommand = barrierAction;
    }
     
    //构造器
    public CyclicBarrier(int parties) {
      this(parties, null);
    
    public int await() throws InterruptedException, BrokenBarrierException
    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
    
    • 线程调用 await() 表示自己已经到达栅栏,当前线程阻塞,等待其他线程到达栅栏
    • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
    • InterruptedException,其他线程中断了这个线程就会抛出该异常,其他的线程就会抛出BrokenBarrierException
    public static void main(String[] args) throws Exception{
            CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()-> System.out.println("所有运动员准备就绪"));
            ExecutorService executors = Executors.newFixedThreadPool(10);
            for(int i=0;i<10;i++){
                executors.submit(()->{
                    try {
                        Thread.sleep(100);
                        System.out.println("我准备好了");
                        cyclicBarrier.await();//到达栅栏 阻塞 等待其他线程到达
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }catch (BrokenBarrierException e){
                        e.printStackTrace();
                    }
                    System.out.println("起飞!");
                });
            }
            executors.shutdown();
        }
    

    demo代码中栅栏的计数器为5,循环线程数为10,当5个一组进行完之后,计数器会重置开启下一轮。

    4.CompletableFuture

    异步化是利用多线程进行程序性能优化的基础

    4.1 Java中实现异步的方法:

    • 1.异步调用 调用方创建一个子线程,在子线程中执行方法调用
    • 2.异步方法 方法实现时,创建新线程执行主要的逻辑,主线程直接返回结果

    异步转同步的方法:主线程阻塞直到子线程执行完成。

    Java在1.8版本提供了CompletableFuture来支持异步编程,实现非常复杂,但是功能也很强大。

    CompletableFuture 优点

    1. 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
    2. 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务3要等待任务1和任 务2都完成后才能开始”;
    3. 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
    /**
    * 实现Future 接口  可以获得执行结果
    * CompletionStage 异步流程中的串行,汇聚
    */
    
    public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
               
           //判断线程池的并行级别是否大于1  并行级别决定了同一时刻最多有多少个线程在执行,如不传如并行级别参数,将默认为当前系统的CPU个数。
           private static final boolean useCommonPool =
            (ForkJoinPool.getCommonPoolParallelism() > 1);
    
           //默认的线程池声明
           private static final Executor asyncPool = useCommonPool ?
            ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    
          //创建CompletableFuture的静态方法
          //使用默认的线程池
          static > CompletableFuture runAsync(Runnable runnable) 
          static CompletableFuture supplyAsync(Supplier supplier)
    
           //可以指定线程池 
          static CompletableFuture runAsync(Runnable runnable, Executor executor) 
          static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
                
          //Runnable 接口的run()方法没有返回值,
          //而Supplier接口的get()方法是有 返回值的  是函数式编程的接口 
          //创建完CompletableFuture对象之后,会自动地异步执行runnable.run()方法或者supplier.get()方法
          //如果所有的CompletableFuture 都使用同一个线程池,当进行IO这种耗时的操作时,就会造成长时间的等待,造成线程饥饿,
    根据不同的业务类型创建不同的线程池,以避免互相干扰。
    }
    
    

    4.2 异步流程控制相关方法

    4.2.1 串行

        public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
            return uniApplyStage(null, fn);
        }
        //thenApply既能接收参数也支持返回值,所以thenApply系列方法返回的是 ? extends U ,U为CompletableFuture
        //thenAccept、  虽然支持参数,但却不支持回值
        //thenRun、 参数是Runnable,所以action既不能接收参数也不支持返回值
        //thenCompose  个系列的方法会新创建出一个子流程,最终结果和thenApply系列是相同的。
        
        CompletionStage thenApply(fn); 
        CompletionStage thenApplyAsync(fn); 
        CompletionStage thenAccept(consumer); 
        CompletionStage thenAcceptAsync(consumer);
        CompletionStage thenRun(action);
        CompletionStage thenRunAsync(action);
        CompletionStage thenCompose(fn); 
        CompletionStage thenComposeAsync(fn);
        //Async代表的是异步执行fn、consumer或者action
        
       //首先通过supplyAsync()启动一个异步 流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是 串行执行的,②依赖①的执行结果,③依赖②的执行结果。
          CompletableFuture f0 = CompletableFuture.supplyAsync( () -> "Hello World") //1
          .thenApply(s -> s + " QQ") //2
          .thenApply(String::toUpperCase);//3
          System.out.println(f0.join()); 
          //输出结果 HELLO WORLD QQ
    
       
        
    

    4.2.2.汇聚
    AND 是thenCombine、thenAcceptBoth和runAfterBoth 系列方法,参数分别是 接口的区别也是源自fn、consumer、action这三个核心参数不同

    CompletionStage thenCombine(other, fn); 
    CompletionStage thenCombineAsync(other, fn);
    CompletionStage thenAcceptBoth(other, consumer); 
    CompletionStage thenAcceptBothAsync(other, consumer);
    CompletionStage runAfterBoth(other, action);
    CompletionStage runAfterBothAsync(other, action);
    

    OR 主要是applyToEither、acceptEither和runAfterEither系列的 接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。

    CompletionStage applyToEither(other, fn); 
    CompletionStage applyToEitherAsync(other, fn); 
    CompletionStage acceptEither(other, consumer);
     CompletionStage acceptEitherAsync(other, consumer);
     CompletionStage runAfterEither(other, action);
     CompletionStage runAfterEitherAsync(other, action);
    
    //两者任一完成即可
    CompletableFuture f1 = CompletableFuture.supplyAsync(()->{
         int t = getRandom(5, 10);
         sleep(t, TimeUnit.SECONDS);
         return String.valueOf(t); 
    });
    
     CompletableFuture f2 = CompletableFuture.supplyAsync(()->{
         int t = getRandom(5, 10);
         sleep(t, TimeUnit.SECONDS);
         return String.valueOf(t); 
    }); 
    CompletableFuture f3 = f1.applyToEither(f2,s -> s); 
    System.out.println(f3.join());
    

    4.2.3、异常处理
    fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它 们抛出运行时异常。

    CompletionStage exceptionally(fn);
    CompletionStage whenComplete(consumer);
    CompletionStage whenCompleteAsync(consumer);
    CompletionStage handle(fn);
    CompletionStage handleAsync(fn);
    

    exceptionally()的使用非常类似于 try{}catch{}中的catch{},
    但是由于支持链式编程方式,所以相对更简单。既然有try{}catch{},那就一定还 有try{}finally{},

    whenComplete()handle()系列方法就类似于try{}finally{}中的finally{},
    无论是否发生异 常都会执行whenComplete()中的回调函数consumer和handle()中的回调函数fn
    whenComplete()和 handle()的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。

    exceptionally捕获异常

    CompletableFuture f0 = CompletableFuture .supplyAsync(()->7/0)) 
    .thenApply(r->r*10)
    .exceptionally(e->0);
    System.out.println(f0.join());
    

    除此之外还有CompletionService提供批量的异步操作,ForkJoinPool、ForkJoinTask提供分治任务。

    PS:函数式编程

    函数式接口(Functional Interface)
    就是一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。
    函数式接口可以被隐式转换为 lambda 表达式。
    Lambda 表达式和方法引用(实际上也可认为是Lambda表达式)上。

    如定义了一个函数式接口如下:

    @FunctionalInterface GreetingService
    {
    void sayMessage(String message);}
    

    那么就可以使用Lambda表达式来表示该接口的一个实现(注:JAVA 8 之前一般是用匿名类实现的):

    GreetingService greetService1 = message -> System.out.println("Hello " + message);
    

    Function<T,R> 接口

    FunctionalInterface
    public interface Function<T, R> {
           // 输入参数T,经过该函数处理得到R。也就是将参数应用于这个函数。
           R apply(T t);
    
    
          // 此函数有两个参数先将传进来的这个参数传入到before这个函数里面进行处理一下,然后before这个函数的返回结果再作为一个参数传递给外面的这个函数。
          default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
          Objects.requireNonNull(before);
                return (V v) -> apply(before.apply(v));
          }
    
    
          // 此函数有两个参数先将传进来的这个参数传入到apply这个函数里面进行处理一下,然后这个函数的返回结果再作为一个参数传递给外面after的这个函数。
          default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
                Objects.requireNonNull(after);
                return (T t) -> after.apply(apply(t));
          }
    
    
          static <T> Function<T, T> identity() {
              return t -> t;
          }
    }
    

    Function 就是一个函数,T代表输入参数,R代表返回的结果。其作用类似于数学中函数的定义y=f(x),(x,y)跟<T,R>的作用几乎一致。
    所以Function中没有具体的操作,具体的操作需要我们去为它指定,因此apply具体返回的结果取决于传入的lambda表达式。

          public void test(){
                Function<Integer,Integer> test=i->i+1;
                test.apply(5);
          }
    /** print:6*/
    

    Java.util.function 包中的类
    https://www.runoob.com/java/java8-functional-interfaces.html

  • 相关阅读:
    江苏高考的程序题
    visio使用小记
    debian+postfix+dovecot+squirrelmail安装配置笔记
    System.Net.Mail
    DBHelper
    朝发白帝城
    《计算机网络》复习题2010
    mvc3在view中获取是否有验证错误
    Validation failed for one or more entities. See 'EntityValidationErrors' property for more details.
    ASP.NET MVC3 Model验证总结
  • 原文地址:https://www.cnblogs.com/shinyrou/p/13451556.html
Copyright © 2011-2022 走看看