zoukankan      html  css  js  c++  java
  • 多线程并发工具类03-CyclicBarrier

    CyclicBarrier一般也是用于对多个线程任务进行同步执行。是多线程并发开发中重要的一个工具类;

    分析一下源码

    初始化

        //用于控制栅栏进入的锁
        private final ReentrantLock lock = new ReentrantLock();
        //条件锁
        private final Condition trip = lock.newCondition();
       //
        private final int parties;
        //当任务被捕获时,执行的任务
        private final Runnable barrierCommand;
        //
        private Generation generation = new Generation();
    
        //等待的数量
        private int count;
    

    await方法

    • 在所有线程任务都到达之前,线程任务都是阻塞状态

    • 当线程任务中出现,中断或者定时任务超时,就会唤醒所有任务执行。

    • 所有线程任务都到达后会唤醒所有任务。

    • 可以在初始化的时候传入基础线程任务,一般是首先执行传入线程任务的。

     public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
    /**
     * 1.如果当前线程中断了,打破栅栏;
     * 2.如果所有任务到齐,则执行传入的任务(如果有),并唤醒所有的任务,并返回0;
     * 3.否则,存放入条件队列,直到超时或者打破了栅栏,则抛出异常。如果新生栅栏,则返回当前线程index;
     */
    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;//获取屏障
    
                if (g.broken)//屏障属性设置为true则抛出异常
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    breakBarrier();//唤醒条件队列中的所有任务,将屏障状态设置为true
                    throw new InterruptedException();
                }
                /**
                 * 1.任务数量减1,判断剩余任务数量是否为0;
                 * 2.如果任务为0,则,判断初始化时是否传入任务,先执行传入任务;
                 * 3.标记运行状态;并生成新栅栏同时唤醒条件队列中的所有任务;返回0;
                 * 4.如果在运行传入任务或唤醒条件队列任务时出现异常,则打破栅栏;    
                 */
                int index = --count;
                if (index == 0) {  // 当数量为0时
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;//运行状态,设置为true表示已经运行了传入的任务;
                        nextGeneration();
                        return 0;//并返回0
                    } finally {
                        if (!ranAction)//如果任务没执行,则修改屏障状态,唤醒所有条件队列等待任务;
                            breakBarrier();
                    }
                }
                /**
                 * 如果超时,或者栅栏被打破,或者中断异常,或者有新的栅栏生成会跳出循环。
                 * 过程:
                 * 1.如果没超时,那么就放入到条件队列中(除非当前线程中断,并且栅栏没有变更,打破栅栏抛出异常)
                 * 2.判断栅栏是不是已经中断了,如果中断了则抛出异常;
                 * 3.如果生成新的栅栏,说明所有任务已经到位,并且已经唤醒了任务,返回任务数量。
                 * 4.如果超时,则打破栅栏,抛出超时异常。
                 */
                for (;;) {
                    try {
                        if (!timed)//没有超时,则放入条件队列等待唤起
                            trip.await();
                        else if (nanos > 0L)//有等待超时时间,则设置超时时间,超时自动退出
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {//如果中断了,判断屏障状态是否变化,如果没变化,则中断屏障。并抛出异常
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            //中断当前线程
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)//根据状态判断是否抛出异常
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {//超时,则抛出异常
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
    /**
     * 打破栅栏:核心是唤醒条件队列中的消息
     */ 
     private void breakBarrier() {
            generation.broken = true;//更改栅栏状态
            count = parties;//将count值复原
            trip.signalAll();//唤醒条件队列中的消息
        }
    
    /**
     * 生成新的栅栏:唤醒所有条件队列的任务,生成新的栅栏
     */
    private void nextGeneration() {
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }
    

    reset方法

    • 重置当前栅栏
        public void reset() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                breakBarrier();   // 打破当前的栅栏
                nextGeneration(); // 生成新的栅栏
            } finally {
                lock.unlock();
            }
        }
    
    

    示例代码

    • 求和操作
    class Solver {
        int result = 0;
        final ConcurrentHashMap<String,Object> data = new ConcurrentHashMap<>();
        final CyclicBarrier barrier;
    
        class Worker implements Runnable {
            int myRow;
            boolean flag = false;
            Worker(int row) { myRow = row; }
            public void run() {
                while (!done()) {
                    processRow(myRow);
                    try {
                        barrier.await();
                    } catch (InterruptedException ex) {
                        return;
                    } catch (BrokenBarrierException ex) {
                        return;
                    }
                }
            }
    
            private void processRow(int myRow) {
                data.put(Thread.currentThread().getName(),myRow);
                flag = true;
            }
            private boolean done() {
                return flag;
            }
        }
    
    
    
        public Solver(int N) throws InterruptedException {
            Runnable barrierAction =
                    new Runnable() { public void run() { mergeRows(); }};
            barrier = new CyclicBarrier(N, barrierAction);
    
            List<Thread> threads = new ArrayList<Thread>(N);
            for (int i = 0; i < N; i++) {
                Thread thread = new Thread(new Worker(i));
                threads.add(thread);
                thread.start();
            }
    
            // wait until done
    //        for (Thread thread : threads)
    //            thread.join();
        }
    
        private void mergeRows() {
            this.data.entrySet().stream().forEach(entry ->{
                this.result += (Integer)this.data.get(entry.getKey());
            });
            System.out.println("result:"+result);
        }
    
        public static void main(String[] args) throws InterruptedException {
            Solver solver = new Solver(5);
    
        }
        
    }
    
    

    总结

    1.CyclicBarrier和CountDownLatch的区别?

    • 内部结构不同:

      • CyclickBarrier内部是通过ReentrantLock的条件队列进行管理线程任务的。

      • CountDownLatch内部是通过静态子类继承AQS,实现共享锁state来进行管理任务数量的。

    • 原理不同:

      • CyclickBarrier是所有任务都到达后才执行任务;

      • CountDownLatch是只在最后判断状态是不是全部执行完成,stat == 0;

    • 线程数量:

      • CyclickBarrier根据线程数进行控制的

      • CountDownLatch是通过调用countDown方法,state减1,和线程个数无关

    • 应用:

      • CyclickBarrier可以根据基于子线程进行处理其他线程的结果,处理比较复杂的业务。并且可以通过reset方法重新执行方法。

      • CountDownLoatch则必须在主线程才能处理,一般用于任务执行初始化数据

  • 相关阅读:
    【Java学习】Intellij IDEA基本配置
    【Java学习】Integer、new Integer() 和 int 比较和相关的面试题
    【Java学习】String[] args和String args[]的区别在哪里?
    【Java学习】包装类
    【Java学习】Java 枚举(enum)
    【Java学习】eclipse与intellij idea的区别
    【Mysql学习】mysql远程连接:ERROR 1130 (HY000): Host '*.*.*.*' is not allowed to connect to this MySQL server解决办法
    【Mysql学习】Mysql安装
    qplot函数用法(转载)
    webservice部署到服务器报错
  • 原文地址:https://www.cnblogs.com/perferect/p/13711280.html
Copyright © 2011-2022 走看看