zoukankan      html  css  js  c++  java
  • CountDownLatch & CyclicBarrier源代码实现解析

    CountDownLatch

        CountDownLatch同意一条或者多条线程等待直至其他线程完毕以系列的操作的辅助同步器。



        用一个指定的count值对CountDownLatch进行初始化。

    await方法会堵塞,直至由于调用countDown方法把当前的count降为0,在这以后。全部的等待线程会被释放。而且在这以后的await调用将会马上返回。这是一个一次性行为——count不能被重置。假设你须要一个能够重置count的版本号。考虑使用CyclicBarrier。
        事实上本类实现很easy。和ReentrantLock类似。公有的方法都是调用内部类的桥接模式,内部类是继承AQS的锁实现。详细例如以下:

      private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
    
            Sync(int count) {
                setState(count);
            }
    
    
            int getCount() {
                return getState();
            }
    
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
    
            protected boolean tryReleaseShared(int releases) {
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
        构造函数调用setState把count值设置为当前的状态。内部类Sync由CountDownLatch构造函数时创建,当然保证非负值也是这里推断,
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
        另外。看看CountDownLatch的的await和countDown方法的实现:
       public void countDown() {
            sync.releaseShared(1);
        }
    
    
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        能够看到await调了AQS的acquireSharedInterruptibly尝试获取共享锁,countDown方法调用了releaseShared尝试释放共享锁,两个方法的參数都是1。因此当此时有多条线程同一时候调用await时,这时候看到内部Sync类的tryAcquireShared方法实现,因为在构造函数里已经调用setState把当前锁状态数设置为count。因此这里在getState()的推断会一直返回count的值,因此tryAcquireShared会一直返回-1,然后这些调用await的线程都会进入等待队列。
        继续看看countDown的实现。调用的是AQS的releaseShared的方法,经过调用来到内部Sync类的tryReleaseShared方法,明显看到方法仅仅是利用自旋把当前的锁状态数减去一。直到锁状态数等于0,然后返回true,这样就AQS就把先前进入等待队列里的全部等待共享锁的线程唤醒,同一时候假设后面继续有线程调用await的话,因为锁状态数已经变为0,因此tryAcquireShared会一直返回1,这时函数countDown就会立马返回,不须要再进入等待队列。


        须要注意的是,这里的count值在构造函数就已经被决定了,兴许也没有方法能够改动,当然这是由这个类的最初设计意图所决定的。

    假设须要能够对count值进行更改,能够參考CyclicBarrier。

    CyclicBarrier

        CyclicBarrier同意一组线程互相等待直到一个公平屏障点(common barrier point)。

    与CountDownLatch不同的是CyclicBarrier着重与互相等待。而且加入重置原状态的方法。

        在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier非常实用。

    由于该屏障(barrier)在释放等待线程后能够重用,所以称它为循环的屏障。CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放全部线程之前)。该命令仅仅在每一个屏障点执行一次。若在继续全部參与线程之前更新共享状态,此屏障操作非常实用。
        假设屏障操作不依赖于线程组运行时被悬挂。则线程组内不论什么线程在被释放的时候都能够运行屏障操作。为了改进这个行为,每一个await的调用都会返回线程到达屏障的索引。然后你就能够选择哪条线程应该运行屏障操作。比如:

        if (barrier.await() == 0) {
            // 运行屏障操作
        }
        CyclicBarrier对于失败的同步尝试,会使用全有或者全无的破坏模型(breakage model):假设一条线程因为中断、异常或者超时提前离开了屏障点,其他全部在屏障点等待的线程也会通过抛出BrokenBarrierException(或者InterruptedException异常,同一时候被中断的情况下)离开屏障点。接下来看看详细的实现。



        首先来看看CyclicBarrier类的构造函数和成员变量以及内部类Generation:

        private static class Generation {
            boolean broken = false;
        }
        CyclicBarrier声明一个内部类Generation,在每次屏障点的使用就代表着一个Generation实例。

    当屏障点被破坏或者重置的时候。generation就要改变。

        //保护屏障点入口的锁
        private final ReentrantLock lock = new ReentrantLock();
        //使线程在await中等待直至屏障点被脱落(tripped)的条件对象
        private final Condition trip = lock.newCondition();
        //须要调用await来脱落屏障点的线程数,为final变量
        private final int parties;
        //在屏障点被脱落之后须要执行的命令
        private final Runnable barrierCommand;
        //当前的Generation
        private Generation generation = new Generation();
        //在每次generation中从parties递减到0,当新的Generation被创建或者屏障点被破坏的时候,count就会被重置
        private int count;
        成员变量中採用lock和trip进行同步控制。另外parties和count记录线程数,generation则表示屏障点的当前状态,还有barrierCommand记录屏障点破坏后须要执行的命令。
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
        当中parties表示在屏障点被脱落(tripped)之前须要调用await的线程数。须要注意的是parties是final变量,因此不能改变,而成员变量count则在每次await的时候递减,重置的时候把parties赋值就可以。barrierAction表示当屏障点被脱落的时候,运行的命令。要注意的时parties数在构造函数里设定以后就不能更改,假设屏障被脱落的时候,能够调用reset重置,我们先来看看await的实现。在详细实现里,await有两个不同的函数版本号,包含无超时版本号和超时版本号,详细例如以下。
     //无超时版本号
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
    
        //超时版本号
        public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    
    
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
    
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
    
                int index = --count;
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
    
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            Thread.currentThread().interrupt();
                        }
                    }
    
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
    
                    if (g != generation)
                        return index;
    
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
        能够看到无超时版本号和超时版本号的await实现仅仅是在调用dowait函数的參数不同。详细来看看dowait的实现。和之前解析的ReentranLock,ReentrantReadWriteLock。以及上面的CountDownLatch不同,CyclicBarrier并没有重载AQS类,而是选择了直接使用ReentrantLock以及其Condition。


        dowait函数一開始就调用RenntrantLock的lock方法尝试获取锁,当获取锁成功之后,然后创建栈变量g保留成员变量generation,因为generation在后面会被其他线程又一次赋值,因此用栈变量保留的做法在多线程同步是有用的技巧。

    接着检验当前Generation的broken变量。假设该变量为true(在重置或者有足够线程调用类await破坏了屏障点)。则此刻立即抛出BrokenBarrierException异常,然后检查当前线程是否已经被中断,假设被中断则调用breakBarrier,依据CyclicBarrier的特性,当一个await的线程中断,则屏障点被破坏,则全部await的线程被唤醒抛出异常。

        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }
        breakBarrier作用就是破坏屏障点,函数首先把成员变量的generation.broken变为true,重置count为parties值,然后调用条件对象trip的signalAll唤醒全部在await等待的线程。

    在await以下我们即将会看到。breakBarrier会造成之前在await的全部线程抛出异常。

    假设线程没有被中断,则把count自减,并保留至index(count相同会在后面释放锁的时候被其他线程改动)。假设index为0,则表示屏障点已经被破坏,然后假设barrierCommand非null,则运行命令,假设运行成功ranAction改动为true。否则在命令里抛出不论什么异常的话,则会在finally块调用breakBarrier破坏屏障点,唤醒其他线程。命令运行成功后,则会调用nextGeneration(脱落当前的屏障点):

        private void nextGeneration() {
            trip.signalAll();
            count = parties;
            generation = new Generation();
        }
        函数先唤醒全部在等待中的线程,然后重置count。接着创建一个新的Generation类实例,这样就等于把CyclicBarrier内部的状态重置。


        我们再来看回dowait的实现,假设当前的index还没有递减到0,则会进入一个循环,在这里首先调用trip的await函数依照參数进行超时或无穷等待。假设在等待过程中,有线程抛出了InterruptedException中断了当前线程,则要继续推断generation是否和当前发生改变。假设栈变量g和generation相等,而且broken为false,则表示在await中发生了中断,因此要调用breakBarrier破坏屏障点,然后抛出异常;但假设以上条件不符合。则表明当前屏障点已经被脱落。但在线程仍在等待唤醒的过程中发生的中断,则此次中断应该于当前的await无关,则须要调用Thread.interrupt方法重置interrupt标识。
        然后假设是被正常唤醒或者超时等待await以后。还要继续推断g.broken。假设为true,则表示屏障被破坏,要抛出BrokenBarrierException异常;假设栈变量g不和generation相等,则表示当前屏障点已经被脱落。因此要返回之前的index表达进入屏障点的索引。另外继续是否超时,假设超时则相同需要breakBarrier而且抛出TimeoutException。因为考虑到线程并发问题,假设以上推断都失败则必需要又一次循环。

    最后离开函数的时候必需要调用lock.unlock释放锁。

        另外,假设在await的线程数没有达到parties,但须要又一次同步,能够调用reset方法。

        public void reset() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                breakBarrier();   // break the current generation
                nextGeneration(); // start a new generation
            } finally {
                lock.unlock();
            }
        }
        函数实现非常easy,尝试获取锁,然后调用breakBarrier和nextGeneration方法。这样调用之后,屏障点就会被破坏(breakage)。则把之前在await的线程唤醒并让它们抛出异常;然后调用nextGeneration重置当前状态,这样后来的await可以再次又一次等待。

    总结

        这样。我们就完整地把CountDownLatch和CyclicBarrier进行了分析。CountDownLatch着重于多组线程等待另外一组线程完毕操作。而且是无法重置的;CyclicBarrier则是着重于一组线程互相等待到对方都完毕操作为止,但能够重置。
        假设要考虑到CountDownLatch为什么不提供一个可重置的方法。个人觉得考虑到实现重置,则必需要像CyclicBarrier一样要考虑到对其他正在等待线程的影响,这样势必就会使整个同步器模型更加复杂,令使用者不方便,同一时候也会加大实现难度。这样不如像JUC包给出的解决方式一样,提供CountDownLatch用于更普遍的简单的并发情况,另外再提供CyclicBarrier来为更加复杂的并发模型提供帮助。而其实CountDownLatch的同步模型比CyclicBarrier要简单。主要体如今等待线程之间不会互相影响,另外CountDownLatch的实现也要比CyclicBarrier更加简单。

  • 相关阅读:
    地质三维软件Ctech(MVS,EVSPRO)破解版
    自已动手制做“0中带斜杠”的矢量字体
    HDU 4549 M斐波那契数列
    Troubleshooting:CentOS安装小记
    RDP支持的颜色深度
    Qt多线程学习:创建多线程
    NFS安装配置
    多台电脑共享键盘鼠标
    Troubleshooting:HTTP Status 500 Error instantiating servlet class xxx
    Win7远程桌面连接
  • 原文地址:https://www.cnblogs.com/mfmdaoyou/p/7056212.html
Copyright © 2011-2022 走看看