zoukankan      html  css  js  c++  java
  • CyclicBarrier源码解析

    CyclicBarrier

    CyclicBarrier栅栏,与CountDownLatch类似,但不是基于AQS实现的同步器,用于多个线程之间等待。CyclicBarrier每次使用完之后可以重置,CountDownLatch不可重置,CyclicBarrier同步一组线程,

    CountDownLatch同步两组线程(一组调用await()方法阻塞等待,另一组调用countDown()唤醒阻塞线程)。

    CyclicBarrier接收一个Runnable对象,当线程全部到达(调用await()),执行Runnable

    内部类Generation

    CyclicBarrier每次重置都会生成新一代,所以CyclicBarrier可以复用。

    private static class Generation {
        // broker = true意味着所有线程已经全部到达,可重置`CyclicBarrier`
        boolean broken = false;
    }
    

    关键属性

        /** The lock for guarding barrier entry */
        private final ReentrantLock lock = new ReentrantLock();
        // 当有线程到达时,如果count不能减到1,线程将会在该条件上等待
        private final Condition trip = lock.newCondition();
        // 总共需要等待的线程数
        private final int parties;
        // 所有等待线程都到达时执行该命令
        private final Runnable barrierCommand;
        /** The current generation */
        private Generation generation = new Generation();
    
        /**
         * 还需要等待的线程。初始值为总共需要等待的线程,每到达一个线程,count减1,当count减0,表示所有线程
         * 全部到达。当新产生一代或者栅栏被打破,count重置为parties
         */
        private int count;
    

    构造函数

    /**
     * @param parties 总共需要等待的线程数,线程数小于1,抛出异常
     * @param barrierAction 最后一个线程到达,执行该command
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    

    阻塞等待

    线程到达之后,会在条件上等待,直到最后一个线程到达,最后一个线程到达时会执行command,并产生下一代。

    // 阻塞线程
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
    TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
    		
            // 如果阻塞线程时,发现栅栏已经被打破,抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
    
            // 如果线程被打断,则打断栅栏,并抛出异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
    
            // 当前线程到达后剩余线程数
            int index = --count;
            // 线程已经全部到达
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 在最后一个到达的线程上执行command
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 产生新的一代 唤醒等待线程,重置count到parties,产生新一代,
                    nextGeneration();
                    return 0;
                } finally {
                    // command执行失败,打破栅栏
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // loop until tripped, broken, interrupted, or timed out
            // 自旋等待条件出现,线程中断,或超时
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subseque	nt execution.
                        Thread.currentThread().interrupt();
                    }
                }
    
                // 栅栏被打破,抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (g != generation)
                    return index;
    
                // 超时,打破栅栏,抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 产生新一代
     * 1、唤醒所有等待的线程
     * 2、重置count到parties
     * 3、产生新的一代
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }
    
    /**
     * 打破栅栏
     * 1、generation.broker = true标识栅栏已经被打破
     * 2、重置count到parties
     * 3、唤醒所有等待的线程
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    

    重置

    /**
     * 重置栅栏
     * 1、打破栅栏
     * 2、生成新一代
     */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
    

    已到达等待线程

    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
    
  • 相关阅读:
    全新的ASP.NET !
    asp.net core+ef core
    直播服务器Nginx
    NET Core1
    网络爬虫1
    java thread reuse(good)
    java中Executor、ExecutorService、ThreadPoolExecutor介绍(转)
    小心LinkedHashMap的get()方法(转)
    LinkedHashMap相关信息介绍(转)
    HashTable和HashMap的区别
  • 原文地址:https://www.cnblogs.com/QullLee/p/12247745.html
Copyright © 2011-2022 走看看