zoukankan      html  css  js  c++  java
  • java多线程-CyclicBarrier

    主要成员变量:

        private final ReentrantLock lock = new ReentrantLock();
        private final Condition trip = lock.newCondition();
        /** The number of parties */
        private final int parties;
        //栅栏放开钱执行的任务
        private final Runnable barrierCommand;
        //当前代
        private Generation generation = new Generation();//当前多少个线程在等待
        private int count;

    构造函数:

        public CyclicBarrier(int parties) {
            this(parties, null);
        }
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }

    主要方法:

    private void nextGeneration() {
            //唤醒所有阻塞的线程
            trip.signalAll();
            //产生下一代
            count = parties;
            generation = new Generation();
        }
    
        private static class Generation {
            //标识栅栏是否被破坏
            boolean broken = false;
        }
        private void breakBarrier() {
            //当前栅栏已经破坏
            generation.broken = true;
            count = parties;
            //唤醒所有阻塞的线程
            trip.signalAll();
        }
        //有2个await方法区别是是否超时
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
        
        public int await(long timeout, TimeUnit unit)
                throws InterruptedException,
                BrokenBarrierException,
                TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
        private int dowait(boolean timed, long nanos)
                throws InterruptedException, BrokenBarrierException,
                TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                //每次调用count--
                int index = --count;
                //index==0时说明需要等待的线程都到齐了
                if (index == 0) {
                    boolean ranAction = false;
                    try {
    
                        final Runnable command = barrierCommand;
                        //在开启栅栏之前,同步执行传入的任务
                        if (command != null)
                            command.run();
                        ranAction = true;
                        //产生下一代唤醒等待线程并返回
                        nextGeneration();
                        return 0;
                    } finally {
                        //执行传入的任务报错则破坏栅栏
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                //还没有全部到齐时
                for (;;) {
                    try {
                        //未设置超时
                        if (!timed)
                            trip.await();
                        //设置超时
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    //超时处理
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

    应用:

        class Person implements Runnable {
        
            String name;
            CyclicBarrier cyclicBarrier;
        
            public Person(String name, CyclicBarrier cyclicBarrier) {
                this.name = name;
                this.cyclicBarrier = cyclicBarrier;
            }
        
            @Override
            public void run() {
                System.out.println(name+"ready");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(name+"end ready");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(name+"do something together");
            }
        }
    public static void main(String[] args) {
                CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("all person ready");
                    }
                });
                for (int i = 0; i < 5; i++) {
                    new Thread(new Person("person"+i,cyclicBarrier)).start();
                }
            }

    结果:

    person0ready
    person1ready
    person2ready
    person3ready
    person4ready
    person4end ready
    person3end ready
    person2end ready
    person1end ready
    person0end ready执行
    all person ready
    person1do something together
    person4do something together
    person2do something together
    person3do something together

    上例中模拟4人到齐后才会各自开始做某事。但有5个人准备,当4个人到达CyclicBarrier后barrierAction先执行,barrierAction执行完成后,栅栏打开放开4个线程。最后一个到达线程的person0将被阻塞等待3个线程到达CyclicBarrier才会执行。

    要点:

    1. CyclicBarrier可重复利用。(当一次用完count减到0后会产生下一代,count被重新设置为parties)
    2. 传入的barrierAction是同步执行的,如果执行时抛出异常,栅栏也会被破坏掉。
  • 相关阅读:
    设置Edittext全键盘
    Math中和角度,弧长,tan等相关的算法
    Android:TabLayout的一些设置
    Android;设置TextView加粗 代码设置
    Android:关于Edittext的一些设置
    Android:View颤抖的动画效果代码
    Android:dialog去除边框的实现(自带Style的padding)
    Android:自定义Dialog大小,显示圆角
    Android:EventBus
    SAS宏系统选项SYMBOLGEN MCOMPILENOTE MPRINT<NEST> MLOGIC(NEST)
  • 原文地址:https://www.cnblogs.com/liuboyuan/p/10655072.html
Copyright © 2011-2022 走看看