zoukankan      html  css  js  c++  java
  • CyclicBarrier源码解读

    1. 简介

    JUC中的CyclicBarrier提供了一种多线程间的同步机制,可以让多个线程在barrier等待其它线程到达barrier。正如其名CyclicBarrier含义就是可以循环使用的屏障。

    2. 源码解读

    2.1 数据结构

    2.1.1 Generation

    在CyclicBarrier中用Generation来代表每一轮的Cyclibarrier的运行状况。

    private static class Generation {
        // broken表示挂否。
        boolean broken = false;
    }
    private Generation generation = new Generation();
    

    在任意时刻只有一个genration实例是真正代表当前这一轮的运行状况,其他实例都是跑完或者跑挂的。

    2.1.2 barrierCommand

    CyclicBarrier允许我们通过构造方法设置一个Runnable对象,用来在所有线程都到达barrier时执行。

    2.1.3 其它

    parties表示线程数,在parties个线程都调用await方法后,barrier才算是被通过(tripped)了。
    count表示还剩下未到达barrier(未调用await)的线程数量,count会在新的一轮开启或者当前这一轮跑挂时重置为parties。
    CyclicBarrier中的trip用于实现线程间的等待与唤醒的通信,而lock则为CyclicBarrier中的变量(generation和count)提供可见性保证,为临界区的操作提供保护。

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    
    private final int parties;
    private int count;
    

    2.2 await方法

    下面分析await方法的源码。

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                       BrokenBarrierException,
                       TimeoutException {
                return dowait(true, unit.toNanos(timeout));
    }
    
    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
    
            // 如果已经跑挂了,抛出BrokenBarrierException。
            if (g.broken)
                throw new BrokenBarrierException();
    
            // 检查中断标志位。
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
    
            int index = --count;
            // 最后一个到达barrier的线程。
            if (index == 0) {
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 开启下一轮。
                    nextGeneration();
                    return 0;
                } finally {
                    // 如果action执行时发生了,也会break掉barrier。
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // loop until tripped, broken, interrupted, or timed out
            /*
             * 对于其它(不是最后一个)线程,会在trip条件下等待被唤醒。情况有以下几类:
             * 1. 所有线程都到达barrier,并成功执行了barrierAction。
             * 2. 有线程执行了breakBarrier方法。
             * 3. 线程本身被中断。
             * 4. 超时(如果调用的带时间限制的await)。
             */
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    /*
                     * g == generation && !g.broken说明此时当前这一轮还没结束,并且没有其它线程执行过breakBarrier方法。
                     * 这种情况会执行breakBarrier置generation的broken标识为true并唤醒其它线程,之后继续抛出InterruptedException。
                     */
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        /*
                         * 如果g != generation,此时这一轮已经结束,后面返回index作为到达barrier的次序;
                         * 如果g.broken说明之前已经有其它线程执行了breakBarrier方法,后面会抛出BrokenBarrierException。
                         */
                        Thread.currentThread().interrupt();
                    }
    
                }
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                // 这一轮已经结束,则返回到达屏障的次序,0表示最后一个,parties-1表示第一个。
                if (g != generation)
                    return index;
    
                // 判断是否超时。
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    // 保证调用时持有锁。
    private void nextGeneration() {
        // 唤醒其它在trip条件下等待的线程。
        trip.signalAll();
        // 重置count。
        count = parties;
        // 开启下一轮。
        generation = new Generation();
    }
    
    // 保证调用时持有锁。
    private void breakBarrier() {
        generation.broken = true;
        // 重置count。
        count = parties;
        // 唤醒其它在trip条件下等待的线程。
        trip.signalAll();
    },
    

    2.3 其它方法

    CyclicBarrier其它还提供了例如getParties, isBroken, getNumberWaiting, reset等方法,都比较简单。
    其中除了getParties由于parties被final修饰不可变,其余方法都会先去获得互斥锁。

    
    /**
     * 获取当前这一轮是否已经broken。
     */
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 重置barrier到初始状态,所有还在等待中的线程最终会抛出BrokenBarrierException。
     */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
    
    
    /**
     * 获得当前在barrier中等待的线程数。
     */
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
    

    3. 总结

    总的来说CyclicBarrier的源码还是比较简洁易懂的,通过锁和条件,实现了在barrier上同步的功能。
    常常会拿CyclicBarrier和CountdownLatch比较,CountdownLatch的的计数器到0就完事了,没法再重置恢复。而CyclicBarrier的计数器可以通过正常的一轮同步重置,也可以通过reset方法强制重置。CountdownLatch每个调用await的线程会被阻塞直到其它线程通过countDown方法将计数器减到0;而CyclicBarrier则是有parties-1个线程调用await会阻塞直到最后一个线程调用await方法。此外CyclicBarrier还可以设置一个barrierAction,相当于一个hook,这也是CountdownLatch不具有的。

  • 相关阅读:
    Linq聚合操作之Aggregate,Count,Sum,Distinct源码分析
    Linq分区操作之Skip,SkipWhile,Take,TakeWhile源码分析
    Linq生成操作之DefautIfEmpty,Empty,Range,Repeat源码分析
    Linq基础操作之Select,Where,OrderBy,ThenBy源码分析
    PAT 1152 Google Recruitment
    PAT 1092 To Buy or Not to Buy
    PAT 1081 Rational Sum
    PAT 1084 Broken Keyboard
    PAT 1077 Kuchiguse
    PAT 1073 Scientific Notation
  • 原文地址:https://www.cnblogs.com/micrari/p/7954956.html
Copyright © 2011-2022 走看看