zoukankan      html  css  js  c++  java
  • 多线程条件通行工具——CyclicBarrier

    CyclicBarrier的作用是,线程进入等待后,需要达到一定数量的等待线程后,再一次性开放通行。

    • CyclicBarrier(int, Runnable)
      构造方法,参数1为通行所需的线程数量,参数2为条件满足时的监听器。
    • int await()/int await(long, TimeUnit)
      线程进入等待,并返回一个进入等待时的倒计索引。
    • int getParties()
      通行所需的线程数量
    • int getNumberWaiting()
      当前线程数量
    • boolean isBroken()
      本批次是否已经终止
    • reset()
      释放本批次通行,并重新接收下一批线程进入。

    源码分析:

    /**
     * @since 1.5
     * @see CountDownLatch
     *
     * @author Doug Lea
     */
    public class CyclicBarrier {
    
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0)
                throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        private static class Generation {
            boolean broken = false;
        }
    
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition trip = lock.newCondition();
        private final int parties;
        private final Runnable barrierCommand;
        private Generation generation = new Generation();
    
        private int count;
    
        private void nextGeneration() {
            // 通知本批次所有线程可通行
            trip.signalAll();
            // 重置计数器
            count = parties;
            // 重建批次对象,即不同批次使用不同对象
            generation = new Generation();
        }
    
        private void breakBarrier() {
            // 标示本批次已终止
            generation.broken = true;
            // 重置计数器
            count = parties;
            // 通知本批次所有线程可通行
            trip.signalAll();
        }
    
        public int getParties() {
            // 返回通行所需的线程数量
            return parties;
        }
    
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe);
            }
        }
    
        public int await(long timeout, TimeUnit unit) throws InterruptedException,
                BrokenBarrierException, TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    
        private int dowait(boolean timed, long nanos) throws InterruptedException,
                BrokenBarrierException, TimeoutException {
            final ReentrantLock lock = this.lock;
            // 进入同步
            lock.lock();
            try {
                final Generation g = generation;
    
                // 如果本批次已终止,则抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
    
                // 如果线程已终止,则终止本批次
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                // 更新计数器
                int index = --count;
                // 判断是否达到可释放的线程数量
                if (index == 0) {
                    // 观察监听器是否正常运行结束
                    boolean ranAction = false;
                    try {
                        // 执行监听器
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        // 标记正常运行
                        ranAction = true;
                        // 通知所有线程并重置
                        nextGeneration();
                        // 返回索引
                        return 0;
                    } finally {
                        // 如果监听器是运行时异常结束,则终止本批次
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                for (;;) {
                    // 进入等待或计时等待
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && !g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    // 如果已经换批,则返回索引退出
                    if (g != generation)
                        // 返回索引
                        return index;
    
                    // 如果超时,则 止本批次
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                // 退出同步
                lock.unlock();
            }
        }
    
        public boolean isBroken() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 返回本批次是否已经终止
                return generation.broken;
            } finally {
                lock.unlock();
            }
        }
    
        public void reset() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 终止本批次
                breakBarrier();
                // 开始下一批
                nextGeneration();
            } finally {
                lock.unlock();
            }
        }
    
        public int getNumberWaiting() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 返回本批次等待中的线程数量
                return parties - count;
            } finally {
                lock.unlock();
            }
        }
    }
  • 相关阅读:
    AndroidManifest.xml详细分析
    GoogleMap-------解决不能使用问题
    GoogleMap-------manifest文件配置
    GoogleMap-------Google Play services SDK的下载和配置
    css_兼容IE和FF的写法
    dede如何新建一个ajax服务端输出文件
    js用ajax和不同页面的php互相传值的方法
    js获取多个标签元素的内容,并根据元素的内容修改标签的属性
    网页上记录鼠标的点击次数和一段有用的php代码,自己学习使用
    PHP弹出提示框并跳转到新页面即重定向到新页面
  • 原文地址:https://www.cnblogs.com/hvicen/p/6242471.html
Copyright © 2011-2022 走看看