zoukankan      html  css  js  c++  java
  • Cyclicbarrier类

    1.简述

      CyclicBarrier字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时候,屏障才会开门。所有被屏障拦截的线程才会运行。

      和CountdownLatch比较类似,但CyclicBarrier更加注重的是集合内的线程同步,线程组内的所有线程都必须等待组内其他线程运行到一个barrier point,才能继续执行。能够处理更加复杂的场景。并且CyclicBarrier内有一个Generation对象,可以重用下去。

      CyclicBarrier的使用场景

    • 可以用于多线程计算数据,最后合并计算结果的应用场景。

    2.Cyclicbarrier的常用方法

    /**构造方法
     */
    //创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier时执行预定义的操作。
    CyclicBarrier(int parties)
    //创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier时执行给定的屏障操作,该操作由最后一个进入 barrier的线程执行。
    CyclicBarrier(int parties, Runnable barrierAction)
    
    
    /**常用方法
     */
    //在所有参与者都已经在此barrier上调用await方法之前,将一直等待。
    int await()
    //在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
    int await(long timeout, TimeUnit unit)
    //返回当前在屏障处等待的参与者数目。
    int getNumberWaiting()
    //返回要求启动此 barrier的参与者数目。
    int getParties()
    //查询此屏障是否处于损坏状态。
    boolean isBroken()
    //将屏障重置为其初始状态。
    void reset()
    View Code

    3.Cyclicbarrier的源码分析

      CyclicBarrier是通过ReentrantLock(独占锁)Condition来实现的。下面,我们分析CyclicBarrier中的源码。

      CyclicBarrier的主要属性

    //并没有自定义同步器,而是定义了一个Generation类,里面包含一个broker属性。
    private static class Generation {
        boolean broken = false;//只有一个标记值
    }
    
    //属性
    private final ReentrantLock lock = new ReentrantLock(); //可重入锁
    private final Condition trip = lock.newCondition(); //Condition后面的await()和singalAll()的调用
    private final int parties;     //参与人数量
    private final Runnable barrierCommand; //触发时要运行的命令
    private Generation generation = new Generation();
    private int count;// 记录还有多少在等待数
    View Code

      CyclicBarrier的构造函数

    /**需要传入一个parties变量,也就是需要等待的线程数。
     */
    public CyclicBarrier(int parties) {
        this(parties, null);//parties表示必须同时到达barrier的线程个数。
    }
    /**也可以传入执行的命令,唤醒时调用
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;//parties表示必须同时到达barrier的线程个数。
        this.count = parties;//count表示处在等待状态的线程个数。
        this.barrierCommand = barrierAction;//barrierCommand表示parties个线程到达barrier时,会执行的动作。
    }
    View Code

      CyclicBarrier的await方法

      每个需要在栅栏处等待的线程都需要显式地调用await()方法等待其它线程的到来。

    /**该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态。直到所有线程都到达屏障点,当前线程才会被唤醒
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }
    /**该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态。在timeout指定的超时时间内,等待其他参与线程到达屏障点。
     * 如果超出指定的等待时间,则抛出TimeoutException异常,如果该时间小于等于零,则此方法根本不会等待.
     */
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    View Code

      CyclicBarrier的核心方法dowait

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //获取独占锁(lock)
        lock.lock();
        try {
            //保存当前的generation
            final Generation g = generation;
    
            //若当前generation已损坏,则抛出异常。
            if (g.broken)
                throw new BrokenBarrierException();
    
            //如果当前线程被中断,则通过breakBarrier()方法终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
    
            //将count计数器-1
           int index = --count;
           //如果index=0,则意味着有parties个线程到达barrier。
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   //如果barrierCommand不为null,则执行该动作。
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   //唤醒所有等待线程,并更新generation。
                   nextGeneration();
                   //这里等价于return index;
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }
    
            // loop until tripped, broken, interrupted, or timed out
            //当前线程一直阻塞,直到有parties个线程到达barrier或当前线程被中断或超时这3者之一发生,当前线程才继续执行。
            for (;;) {
                try {
                    //如果不是超时等待,则调用awati()方法进行等待。否则,调用awaitNanos()方法进行等待。
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //如果等待过程中,线程被中断,则执行下面的函数。
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
    
                //如果当前generation已经损坏,则抛出异常。
                if (g.broken)
                    throw new BrokenBarrierException();
    
                //如果generation已经换代,则返回index。
                if (g != generation)
                    return index;
    
                //如果是超时等待,并且时间已到,则通过breakBarrier()方法终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //释放独占锁(lock)
            lock.unlock();
        }
    }
    View Code

      dowait方法的整个逻辑分成两个部分

    1. 最后一个线程走上面的逻辑,当count减为0的时候,打破栅栏,它调用nextGeneration方法通知条件队列中的等待线程转移到AQS的队列中等待被唤醒,并进入下一代。
    2. 非最后一个线程走下面的for循环逻辑,这些线程会阻塞在condition的await方法处,它们会加入到条件队列中,等待被通知,当它们唤醒的时候已经更新换代了,这时候返回。

    4.Cyclicbarrier的使用示例

    public class Test {
        public static void main(String[] args) throws Exception {
            final CyclicBarrier cb = new CyclicBarrier(5);
            for (int i = 0; i < 5; i++) {
                new Thread(new Runnable() {
                    public void run() {
                        try {
                            System.out.println("线程" + Thread.currentThread().getName() + "正在执行同一个任务");
                            // 以睡眠来模拟几个线程执行一个任务的时间
                            Thread.sleep(2000);
                            System.out.println("线程" + Thread.currentThread().getName() + "执行任务完成,等待其他线程执行完毕");
                            // 用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
                            cb.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                        System.out.println("所有线程写入完毕");
                    }
                }).start();
            }
        }
    }
    View Code

    5.总结

      CyclicBarrier总结

    • CyclicBarrier会使一组线程阻塞在await()处,当最后一个线程到达时唤醒(只是从条件队列转移到AQS队列中)前面的线程大家再继续往下走。
    • CyclicBarrier不是直接使用AQS实现的一个同步器。
    • CyclicBarrier基于ReentrantLock及其Condition实现整个同步逻辑。

      CyclicBarrier与CountDownLatch的异同

    • 两者都能实现阻塞一组线程等待被唤醒。
    • CyclicBarrier是最后一个线程到达时自动唤醒。
    • CountDownLatch是通过显式地调用countDown()实现的。
    • CyclicBarrier是通过重入锁及其条件锁实现的,CountDownLatch是直接基于AQS实现的。
    • CyclicBarrier具有“代”的概念,可以重复使用,CountDownLatch只能使用一次。
    • CyclicBarrier只能实现多个线程到达栅栏处一起运行。
    • CountDownLatch不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立。
  • 相关阅读:
    Ubuntu apt-get update 失败
    Ubuntu无法访问windows分区
    Python实现使用tkinter弹出输入框输入数字, 具有确定输入和清除功能
    如何更改监控器的默认计数器
    健壮的 Java 基准测试
    从虚拟机视角谈 Java 应用性能优化
    LoadRunner如何调用外部函数
    git安装与上传
    Loadrunner安装与破解【转】
    性能测试方法【转】
  • 原文地址:https://www.cnblogs.com/bl123/p/14174012.html
Copyright © 2011-2022 走看看