zoukankan      html  css  js  c++  java
  • Java多线程同步工具类之CyclicBarrier

    一、CyclicBarrier使用

    CyclicBarrier从字面上可以直接理解为线程运行的屏障,它可以让一组线程执行到一个共同的屏障点时被阻塞,直到最后一个线程执行到指定位置,你设置的执行线程就会触发运行;同时CyclicBarrier相比与CountDownLatch,它是可以被重置的;下面我们通过一个简单例子看下CyclicBarrier的使用;

    实例化一个CyclicBarrier对象并传入你要控制的线程内部;

        public static void main(String[] args) {
    
            CyclicBarrier cb = new CyclicBarrier(3, new Runnable() {
                public void run() {
                    System.out.println("所有线程集合");
                }
            });
            for (int i = 0; i < 3; i++) {
                new CyclicBarrierThread(i + "", cb).start();
            }
    
        }

    计数线程代码,每当计数到偶数时调用CyclicBarrier的await()方法

    public class CyclicBarrierThread extends Thread{
        
        private CyclicBarrier barrier;
        
        private String name;
        
        private int count;
        
        public CyclicBarrierThread(String name,CyclicBarrier barrier) {
            this.name=name;
            this.barrier=barrier;
            this.count=0;
        }
        
        public void run() {
            try {
                for(int i=0;i<10;i++) {
                    
                    Thread.sleep(100);
                    count++;
                    System.out.println(name+"号线程---"+Thread.currentThread().getName()+"开始计数:"+count);
                    if(count%2==0) {//每计数到偶数次时集合一次
                        barrier.await();
                        System.out.println(name+"号线程---"+Thread.currentThread().getName()+"集合完毕,继续计数");
                    }
                }
                
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    }

    查看代码输出

    2号线程---Thread-2开始计数:1
    0号线程---Thread-0开始计数:1
    1号线程---Thread-1开始计数:1
    2号线程---Thread-2开始计数:2
    1号线程---Thread-1开始计数:2
    0号线程---Thread-0开始计数:2
    所有线程集合
    2号线程---Thread-2集合完毕,继续计数
    1号线程---Thread-1集合完毕,继续计数
    0号线程---Thread-0集合完毕,继续计数
    2号线程---Thread-2开始计数:3
    1号线程---Thread-1开始计数:3
    0号线程---Thread-0开始计数:3
    2号线程---Thread-2开始计数:4
    0号线程---Thread-0开始计数:4
    1号线程---Thread-1开始计数:4
    所有线程集合
    1号线程---Thread-1集合完毕,继续计数
    2号线程---Thread-2集合完毕,继续计数
    0号线程---Thread-0集合完毕,继续计数
    0号线程---Thread-0开始计数:5
    2号线程---Thread-2开始计数:5
    1号线程---Thread-1开始计数:5
    0号线程---Thread-0开始计数:6
    1号线程---Thread-1开始计数:6
    2号线程---Thread-2开始计数:6
    所有线程集合
    2号线程---Thread-2集合完毕,继续计数
    0号线程---Thread-0集合完毕,继续计数
    1号线程---Thread-1集合完毕,继续计数
    0号线程---Thread-0开始计数:7
    1号线程---Thread-1开始计数:7
    2号线程---Thread-2开始计数:7
    1号线程---Thread-1开始计数:8
    0号线程---Thread-0开始计数:8
    2号线程---Thread-2开始计数:8
    所有线程集合
    2号线程---Thread-2集合完毕,继续计数
    0号线程---Thread-0集合完毕,继续计数
    1号线程---Thread-1集合完毕,继续计数
    0号线程---Thread-0开始计数:9
    1号线程---Thread-1开始计数:9
    2号线程---Thread-2开始计数:9
    1号线程---Thread-1开始计数:10
    0号线程---Thread-0开始计数:10
    2号线程---Thread-2开始计数:10
    所有线程集合
    1号线程---Thread-1集合完毕,继续计数
    2号线程---Thread-2集合完毕,继续计数
    0号线程---Thread-0集合完毕,继续计数

    通过输出结果可以看到,计数线程每计数到偶数次时使用CyclicBarrier的await()方法,线程都会进入阻塞等待的状态,直到最后一个线程到达屏障点时,触发你定义的执行线程,而且CyclicBarrier的await()方法是可以重复使用的。

    二、CyclicBarrier源码分析

    下面我们就对CyclicBarrier内部的源码实现进行一些分析与总结

    1、CyclicBarrier的构造

    首先看下CyclicBarrier的构造函数

        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            //拦截的线程数量
            this.parties = parties;
            //用于计数的count值,每有一个线程执行到屏障点,就会递减1
            this.count = parties;
            //定义的拦截线程
            this.barrierCommand = barrierAction;
        }

    CyclicBarrier的构造函数很简单就是接收你要拦截的线程数量与定义的执行线程。

    2、await方法

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }

    我们看下具体实现dowait方法的实现

        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            //获取可重入锁
            final ReentrantLock lock = this.lock;
            //加锁
            lock.lock();
            try {
                //CyclicBarrier内部定义的一个Generation类
                final Generation g = generation;
    
                //判断Generation的broken状态
                if (g.broken)
                    throw new BrokenBarrierException();
    
                //如果线程被中断
                if (Thread.interrupted()) {
                    //Generation的broken置为true,count值重置,并唤醒所有线程
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                //count值减一
                int index = --count;
                if (index == 0) {  // 如果conunt为0,说明最后一个线程到大屏障
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();//执行你传入的线程
                        ranAction = true;
                        nextGeneration();//唤醒所有阻塞的线程,同时重置count值与Generation
                        return 0;
                    } finally {
                        if (!ranAction)
                            //拦截线程没有正常执行,唤醒所有线程,同时重置count值,Generation的broken置为true
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        //是否设置阻塞的超时时间
                        if (!timed)
                            //释放当前锁
                            trip.await();//false 表示不设置,一直阻塞
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);//true 设置阻塞的超时时间
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                //释放锁
                lock.unlock();
            }
        }

    dowait方法的实现流程是很清晰的,通过ReentrantLock的Condition接口与count值相互配合,主要完成以下功能:

    1、当需要拦截的线程到达屏障点调用await方法后获取ReentrantLock锁,保证线程安全;

    2、检查count值是否为0,判断是否是最后一个线程到达屏障,如果是的话执行需要触发执行的线程,调用Condition的signalAll方法唤醒所有阻塞的线程,并重置count值与Generation类,保障CyclicBarrier的重复可用;

    3、如果不是最后一个线程的话,根据传入的参数调用Condition的await方法释放锁资源并进入阻塞等待,直到被唤醒;

    3、reset方法

    可以用来主动重置CyclicBarrier的状态

        public void reset() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //generation.broken设置为true,唤醒所有线程,count值重置
                breakBarrier();   
                nextGeneration(); 
            } finally {
                lock.unlock();
            }
        }
        
        
        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }
    
    
        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }

    breakBarrier()与nextGeneration(),这两个方法的主要区别就在于前者会把generation.broken设置为true,也就是说如果调用reset方法主动重置CyclicBarrier类的状态,当前正在使用CyclicBarrier类同步的线程都会被唤醒或抛出异常;

    4、getNumberWaiting方法

        public int getNumberWaiting() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return parties - count;
            } finally {
                lock.unlock();
            }
        }

    很明显getNumberWaiting方法使用来获取当前已经运行至屏蔽点并阻塞等待的线程数量的;

    三、总结

    通过上面分析可以看到CyclicBarrier的实现原理相对还是比较简单与清晰的,主要是基于ReentrantLock与计数器相结合来实现多个线程的同步控制的。以上就是对CyclicBarrier类的使用与内部实现进行的分析,其中如有不足与不正确的地方还望指出与海涵。

    关注微信公众号,查看更多技术文章。

  • 相关阅读:
    Object类学习
    Thread.State 线程状态
    Thread.UncaughtExceptionHandler
    apply和call的区别
    如何实现border-width:0.5px;
    table固定头部,表格tbody可上下左右滑动
    canvas画布实现手写签名效果
    ES6学习笔记
    for循环中执行setTimeout问题
    javaScript函数提升及作用域
  • 原文地址:https://www.cnblogs.com/dafanjoy/p/11110575.html
Copyright © 2011-2022 走看看