zoukankan      html  css  js  c++  java
  • java多线程-CyclicBarrier

    主要成员变量:

        private final ReentrantLock lock = new ReentrantLock();
        private final Condition trip = lock.newCondition();
        /** The number of parties */
        private final int parties;
        //栅栏放开钱执行的任务
        private final Runnable barrierCommand;
        //当前代
        private Generation generation = new Generation();//当前多少个线程在等待
        private int count;

    构造函数:

        public CyclicBarrier(int parties) {
            this(parties, null);
        }
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }

    主要方法:

    private void nextGeneration() {
            //唤醒所有阻塞的线程
            trip.signalAll();
            //产生下一代
            count = parties;
            generation = new Generation();
        }
    
        private static class Generation {
            //标识栅栏是否被破坏
            boolean broken = false;
        }
        private void breakBarrier() {
            //当前栅栏已经破坏
            generation.broken = true;
            count = parties;
            //唤醒所有阻塞的线程
            trip.signalAll();
        }
        //有2个await方法区别是是否超时
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
        
        public int await(long timeout, TimeUnit unit)
                throws InterruptedException,
                BrokenBarrierException,
                TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
        private int dowait(boolean timed, long nanos)
                throws InterruptedException, BrokenBarrierException,
                TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                //每次调用count--
                int index = --count;
                //index==0时说明需要等待的线程都到齐了
                if (index == 0) {
                    boolean ranAction = false;
                    try {
    
                        final Runnable command = barrierCommand;
                        //在开启栅栏之前,同步执行传入的任务
                        if (command != null)
                            command.run();
                        ranAction = true;
                        //产生下一代唤醒等待线程并返回
                        nextGeneration();
                        return 0;
                    } finally {
                        //执行传入的任务报错则破坏栅栏
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                //还没有全部到齐时
                for (;;) {
                    try {
                        //未设置超时
                        if (!timed)
                            trip.await();
                        //设置超时
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    //超时处理
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

    应用:

        class Person implements Runnable {
        
            String name;
            CyclicBarrier cyclicBarrier;
        
            public Person(String name, CyclicBarrier cyclicBarrier) {
                this.name = name;
                this.cyclicBarrier = cyclicBarrier;
            }
        
            @Override
            public void run() {
                System.out.println(name+"ready");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(name+"end ready");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(name+"do something together");
            }
        }
    public static void main(String[] args) {
                CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("all person ready");
                    }
                });
                for (int i = 0; i < 5; i++) {
                    new Thread(new Person("person"+i,cyclicBarrier)).start();
                }
            }

    结果:

    person0ready
    person1ready
    person2ready
    person3ready
    person4ready
    person4end ready
    person3end ready
    person2end ready
    person1end ready
    person0end ready执行
    all person ready
    person1do something together
    person4do something together
    person2do something together
    person3do something together

    上例中模拟4人到齐后才会各自开始做某事。但有5个人准备,当4个人到达CyclicBarrier后barrierAction先执行,barrierAction执行完成后,栅栏打开放开4个线程。最后一个到达线程的person0将被阻塞等待3个线程到达CyclicBarrier才会执行。

    要点:

    1. CyclicBarrier可重复利用。(当一次用完count减到0后会产生下一代,count被重新设置为parties)
    2. 传入的barrierAction是同步执行的,如果执行时抛出异常,栅栏也会被破坏掉。
  • 相关阅读:
    Add a Simple Action using an Attribute 使用特性添加简单按钮
    通俗易懂,什么是.NET/.NET Framework/.NET Core/.Net Standard?
    一文详解微服务架构
    我是如何失去团队掌控的?
    如何高效的学习技术
    Implement Property Value Validation in Code 在代码中实现属性值验证(XPO)
    设计模式学习笔记 ———— 简单工厂模式
    #ifndef/#define/#endif使用详解
    模块化开发
    源代码生成可执行文件的内部机理
  • 原文地址:https://www.cnblogs.com/liuboyuan/p/10655072.html
Copyright © 2011-2022 走看看