zoukankan      html  css  js  c++  java
  • Java多线程:CountDownLatch、CyclicBarrier 和 Semaphore

    场景描述:

      多线程设计过程中,经常会遇到需要等待其它线程结束以后再做其他事情的情况。
    有几种方案:
     
      1.在主线程中设置一自定义全局计数标志,在工作线程完成时,计数减1。主线程侦测该标志是否为0,一旦为0,表示所有工作线程已经完成。
      2.使用Java标准的类CountDownLatch来完成这项工作,原理是一样的,计数。
     
     

    CountDownLatch

    一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。 
    其机制是:
      当多个(具体数量等于初始化CountDownLatch时的count参数的值)线程都达到了预期状态或完成预期工作时触发事件,其他线程可以等待这个事件来触发自己的后续工作。这里需要注意的是,等待的线程可以是多个,即CountDownLatch是可以唤醒多个等待的线程的。达到自己预期状态的线程会调用CountDownLatch的countDown方法,而等待的线程会调用CountDownLatch的await方法。
    CountDownLatch 很适合用来将一个任务分为n个独立的部分,等这些部分都完成后继续接下来的任务,CountDownLatch 只能出发一次,计数值不能被重置。

    流程图

    如上图所示,当7个线程都完成latch.countDown调用后,最下面那条线程会从latch.await返回,继续执行后面的代码

    函数列表

    • CountDownLatch(int count) :构造一个用给定计数初始化的 CountDownLatch。
    • void await():使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
    • boolean await(long timeout, TimeUnit unit) 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
    • void countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。

    实现原理

     

    实例

       我们来看一个具体的例子。假设我们使用一台多核的机器对一组数据进行排序,那么我们可以把这组数据分到不同线程中进行排序,然后合并;可以利用线程池来管理多线程;可以将CountDownLatch用作各个分组数据都排好序的通知。下面是代码片段:

    先看主线程

    int count = 10;
    final CountDownLatch latch = new CountDownLatch(count);
    int[] datas = new int[10204];
    int step = datas.length / count;
    for (int i=0; i < count; i++) {
        int start = i * step;
        int end = (i+1) * step;
        if (i == count - 1) end = datas.length;
        threadpool.execute(new MyRunnable(latch, datas, start, end));
    }
    latch.await();
    //合并数据

    我们再看一下具体任务的代码,即MyRunnable的run方法的实现:

    public void run() {
          //数据排序
         latch.countDown(); 
    }

    CyclicBarrier

    可以协同多个线程,让多个线程在这个屏障前等待,直到所有线程都达到了这个屏障时,再一起继续执行后面的动作。
    CyclicBarrier适用于多个线程有固定的多步需要执行,线程间互相等待,当都执行完了,再一起执行下一步。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
     

    流程图

     上图中的7个线程各有一个barrier.await,那么任何一个线程在执行到barrier.await时就会进入阻塞等待状态,直到7个线程都到了barrier.await时才会同时从await返回,继续后面的工作。此外如果在构造CyclicBarrier时设置了一个Runnable实现,那么最后一个到barrier.await的线程会执行这个Runnable的run方法,以完成一些预设的工作。
     
    注意比较CountDownLatchCyclicBarrier
      (01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
      (02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
    CountDownLatch 适用于一组线程和另一个主线程之间的工作协作。一个主线程等待一组工作线程的任务完毕才继续它的执行是使用 CountDownLatch 的主要场景;CyclicBarrier 用于一组或几组线程,比如一组线程需要在一个时间点上达成一致,例如同时开始一个工作。另外,CyclicBarrier 的循环特性和构造函数所接受的 Runnable 参数也是 CountDownLatch 所不具备的。
     
     
     
    CountDownLatch
    CyclicBarrier
    适用场景
    主线程等待其他工作线程结束
    多个线程相互等待,直到所有线程都达到一个障碍点Barrier
    主要方法
    CountDownLatch(int count) 主线程调用:初始化计数
     
    await() 主线程调用 : 阻塞,直到等待计数为0时解除阻塞 
     
    countDown() 工作线程调用 : 计数减1
    CyclicBarrier(int parties , Runnnable barrierAction) : 初始化参与者数量和障碍点执行Action,action可选,由主线程初始化
     
    await() : 由工作线程调用,每被调用一次,计数便会减少1,并阻塞住当前线程 , 直到所有线程都达到障碍点
    等待结束
    各线程之间不再相互影响, 可以继续做自己的事情, 不再执行下一个工作目标。
    在障碍点到达后, 允许所有线程继续执行,到达下一个目标后,可以恢复使用CyclicBarrier, barrier 在释放等待线程后可以重用
    异常
     
    如果其中一个线程由于中断、错误、或者超时导致永久离开障碍点,其他线程也将抛出异常。
     

    实例

       
    int count = 10;
    final CyclicBarrier barrier = new CyclicBarrier(count + 1);
    int[] datas = new int[10204];
    int step = datas.length / count;
    for (int i=0; i < count; i++) {
        int start = i * step;
        int end = (i+1) * step;
        if (i == count - 1) end = datas.length;
        threadpool.execute(new MyRunnable(barrier, datas, start, end));
    }
    barrier.await();
    //合并数据

    可以看到CyclicBarrier对象传入的参数值比CountDownLatch大1,原因是构造CountDownLatch的参数是调用countDown的数量,而CyclicBarrier的数量是await的数量

    public void run() {
          //数据排序
         try {
             barrier.await(); 
        }catch (...)
    }

    Semaphore

    Semaphore 信号量对象管理的信号就像令牌,构造时传入个数,总数就是控制并发的数量。我们需要控制并发的代码,执行前先获取信号(通过acquire获取信号许可),执行后归还信号(通过release归还信号许可)。每次acquire成功返回后,Semaphore可用的信号量就会减少一个,如果没有可用的信号,acquire调用就会阻塞,等待有release调用释放信号后,acquire才会得到信号并返回。
    如果Semaphore管理的信号量为1个,那么就退化到互斥锁了;如果多于一个信号量,则主要用于控制并发数。与通过控制线程数来控制并发数的方式相比,通过Semaphore来控制并发数可以控制得更加细粒度,因为真正被控制最大并发的代码放到acquire和release之间就行了。
      Semaphore类位于java.util.concurrent包下,它提供了2个构造器:
    public Semaphore(int permits) {          //参数permits表示许可数目,即同时可以允许多少线程进行访问
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {    //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
        sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
    }

    实例

       例如我们需要控制远程方法的并发量,超过并发量的方法就等待有其他方法执行返回后再执行,那么其代码如下:
    semaphore.acquire();
    try {
        //调用远程通信的方法
    }
    finally {
        semaphore.release();
    }
     
     
     

    参考资料:

  • 相关阅读:
    使用AD你应该避免的五个错误
    卸载常用组件
    学会批处理,用心学很容易
    VI的用法
    安装Linux版VNC 企业版
    【3】淘宝sdk的下载和安装
    【7】创建一个自己的模板
    【6】网店模板目录及文件介绍
    【11】淘宝sdk的DOM、CSS规范、Widget规范(这个Widget规范差不多就是网页效果)和HTML规范
    【2】认识淘宝sdk模板
  • 原文地址:https://www.cnblogs.com/ITtangtang/p/7603238.html
Copyright © 2011-2022 走看看