zoukankan      html  css  js  c++  java
  • Cyclicbarrier类

    1.简述

      CyclicBarrier字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时候,屏障才会开门。所有被屏障拦截的线程才会运行。

      和CountdownLatch比较类似,但CyclicBarrier更加注重的是集合内的线程同步,线程组内的所有线程都必须等待组内其他线程运行到一个barrier point,才能继续执行。能够处理更加复杂的场景。并且CyclicBarrier内有一个Generation对象,可以重用下去。

      CyclicBarrier的使用场景

    • 可以用于多线程计算数据,最后合并计算结果的应用场景。

    2.Cyclicbarrier的常用方法

    /**构造方法
     */
    //创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier时执行预定义的操作。
    CyclicBarrier(int parties)
    //创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier时执行给定的屏障操作,该操作由最后一个进入 barrier的线程执行。
    CyclicBarrier(int parties, Runnable barrierAction)
    
    
    /**常用方法
     */
    //在所有参与者都已经在此barrier上调用await方法之前,将一直等待。
    int await()
    //在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
    int await(long timeout, TimeUnit unit)
    //返回当前在屏障处等待的参与者数目。
    int getNumberWaiting()
    //返回要求启动此 barrier的参与者数目。
    int getParties()
    //查询此屏障是否处于损坏状态。
    boolean isBroken()
    //将屏障重置为其初始状态。
    void reset()
    View Code

    3.Cyclicbarrier的源码分析

      CyclicBarrier是通过ReentrantLock(独占锁)Condition来实现的。下面,我们分析CyclicBarrier中的源码。

      CyclicBarrier的主要属性

    //并没有自定义同步器,而是定义了一个Generation类,里面包含一个broker属性。
    private static class Generation {
        boolean broken = false;//只有一个标记值
    }
    
    //属性
    private final ReentrantLock lock = new ReentrantLock(); //可重入锁
    private final Condition trip = lock.newCondition(); //Condition后面的await()和singalAll()的调用
    private final int parties;     //参与人数量
    private final Runnable barrierCommand; //触发时要运行的命令
    private Generation generation = new Generation();
    private int count;// 记录还有多少在等待数
    View Code

      CyclicBarrier的构造函数

    /**需要传入一个parties变量,也就是需要等待的线程数。
     */
    public CyclicBarrier(int parties) {
        this(parties, null);//parties表示必须同时到达barrier的线程个数。
    }
    /**也可以传入执行的命令,唤醒时调用
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;//parties表示必须同时到达barrier的线程个数。
        this.count = parties;//count表示处在等待状态的线程个数。
        this.barrierCommand = barrierAction;//barrierCommand表示parties个线程到达barrier时,会执行的动作。
    }
    View Code

      CyclicBarrier的await方法

      每个需要在栅栏处等待的线程都需要显式地调用await()方法等待其它线程的到来。

    /**该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态。直到所有线程都到达屏障点,当前线程才会被唤醒
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }
    /**该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态。在timeout指定的超时时间内,等待其他参与线程到达屏障点。
     * 如果超出指定的等待时间,则抛出TimeoutException异常,如果该时间小于等于零,则此方法根本不会等待.
     */
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    View Code

      CyclicBarrier的核心方法dowait

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //获取独占锁(lock)
        lock.lock();
        try {
            //保存当前的generation
            final Generation g = generation;
    
            //若当前generation已损坏,则抛出异常。
            if (g.broken)
                throw new BrokenBarrierException();
    
            //如果当前线程被中断,则通过breakBarrier()方法终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
    
            //将count计数器-1
           int index = --count;
           //如果index=0,则意味着有parties个线程到达barrier。
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   //如果barrierCommand不为null,则执行该动作。
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   //唤醒所有等待线程,并更新generation。
                   nextGeneration();
                   //这里等价于return index;
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }
    
            // loop until tripped, broken, interrupted, or timed out
            //当前线程一直阻塞,直到有parties个线程到达barrier或当前线程被中断或超时这3者之一发生,当前线程才继续执行。
            for (;;) {
                try {
                    //如果不是超时等待,则调用awati()方法进行等待。否则,调用awaitNanos()方法进行等待。
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //如果等待过程中,线程被中断,则执行下面的函数。
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
    
                //如果当前generation已经损坏,则抛出异常。
                if (g.broken)
                    throw new BrokenBarrierException();
    
                //如果generation已经换代,则返回index。
                if (g != generation)
                    return index;
    
                //如果是超时等待,并且时间已到,则通过breakBarrier()方法终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //释放独占锁(lock)
            lock.unlock();
        }
    }
    View Code

      dowait方法的整个逻辑分成两个部分

    1. 最后一个线程走上面的逻辑,当count减为0的时候,打破栅栏,它调用nextGeneration方法通知条件队列中的等待线程转移到AQS的队列中等待被唤醒,并进入下一代。
    2. 非最后一个线程走下面的for循环逻辑,这些线程会阻塞在condition的await方法处,它们会加入到条件队列中,等待被通知,当它们唤醒的时候已经更新换代了,这时候返回。

    4.Cyclicbarrier的使用示例

    public class Test {
        public static void main(String[] args) throws Exception {
            final CyclicBarrier cb = new CyclicBarrier(5);
            for (int i = 0; i < 5; i++) {
                new Thread(new Runnable() {
                    public void run() {
                        try {
                            System.out.println("线程" + Thread.currentThread().getName() + "正在执行同一个任务");
                            // 以睡眠来模拟几个线程执行一个任务的时间
                            Thread.sleep(2000);
                            System.out.println("线程" + Thread.currentThread().getName() + "执行任务完成,等待其他线程执行完毕");
                            // 用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
                            cb.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                        System.out.println("所有线程写入完毕");
                    }
                }).start();
            }
        }
    }
    View Code

    5.总结

      CyclicBarrier总结

    • CyclicBarrier会使一组线程阻塞在await()处,当最后一个线程到达时唤醒(只是从条件队列转移到AQS队列中)前面的线程大家再继续往下走。
    • CyclicBarrier不是直接使用AQS实现的一个同步器。
    • CyclicBarrier基于ReentrantLock及其Condition实现整个同步逻辑。

      CyclicBarrier与CountDownLatch的异同

    • 两者都能实现阻塞一组线程等待被唤醒。
    • CyclicBarrier是最后一个线程到达时自动唤醒。
    • CountDownLatch是通过显式地调用countDown()实现的。
    • CyclicBarrier是通过重入锁及其条件锁实现的,CountDownLatch是直接基于AQS实现的。
    • CyclicBarrier具有“代”的概念,可以重复使用,CountDownLatch只能使用一次。
    • CyclicBarrier只能实现多个线程到达栅栏处一起运行。
    • CountDownLatch不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立。
  • 相关阅读:
    nullnullConnecting with WiFi Direct 与WiFi直接连接
    nullnullUsing WiFi Direct for Service Discovery 直接使用WiFi服务发现
    nullnullSetting Up the Loader 设置装载机
    nullnullDefining and Launching the Query 定义和启动查询
    nullnullHandling the Results 处理结果
    装置输出喷泉装置(贪心问题)
    数据状态什么是事务?
    停止方法iOS CGD 任务开始与结束
    盘文件云存储——金山快盘
    函数标识符解决jQuery与其他库冲突的方法
  • 原文地址:https://www.cnblogs.com/bl123/p/14174012.html
Copyright © 2011-2022 走看看