zoukankan      html  css  js  c++  java
  • JUC包下CyclicBarrier学习笔记

    CyclicBarrier,一个同步辅助类,在API中是这么介绍的:

    它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

    通俗点讲就是:让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

    先可以看下测试结果:

    等待线程进来之后并没有立即去执行相关业务逻辑,当线程达到你设计的要求之后才回去执行相关业务逻辑数据,测试代码如下

    package com.cxy.cyclicBarrier;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Created by Administrator on 2017/4/10.
     */
    public class CyclicBarrierDemo {
        private  static CyclicBarrier cyclicBarrier=new CyclicBarrier(10);
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i =0;i< 10;i++){
                System.out.println("创建线程"+i);
                final int threadNum =i;
                Thread.sleep(1000);
                executorService.execute( ()->{
                    try {
                        cxyDemo(threadNum);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                });
            }
            executorService.shutdown();
    
        }
        private static void  cxyDemo(int threadNum) throws InterruptedException, BrokenBarrierException {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
            cyclicBarrier.await();
            System.out.println(threadNum);
    
        }
    }

    其中利用到lamba表达式,

    利用到可缓存的线程池,可以其中有个注释不是很对

      public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        /**
         * Creates a new {@code CyclicBarrier} that will trip when the
         * given number of parties (threads) are waiting upon it, and
         * does not perform a predefined action when the barrier is tripped.
         *
         * @param parties the number of threads that must invoke {@link #await}
         *        before the barrier is tripped
         * @throws IllegalArgumentException if {@code parties} is less than 1
         */
        public CyclicBarrier(int parties) {
            this(parties, null);
        }

    两个构造方法:

    第一个需要传入线程数,还有就是Runnable 

    第二个 需要传入得线程数

    这个参数定义的线程数,当线程数到设置的值,那么就回去执行awit方法后面的逻辑

    在我上面案例中使用的是第一种构造方法:

    下面将使用第二种案例:

    package com.cxy.cyclicBarrier;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Created by Administrator on 2017/4/10.
     */
    public class CyclicBarrierDemo {
        /*  private  static CyclicBarrier cyclicBarrier=new CyclicBarrier(10);
    
          public static void main(String[] args) throws InterruptedException {
              ExecutorService executorService = Executors.newCachedThreadPool();
              for (int i =0;i< 10;i++){
                  System.out.println("创建线程"+i);
                  final int threadNum =i;
                  Thread.sleep(1000);
                  executorService.execute( ()->{
                      try {
                          cxyDemo(threadNum);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      } catch (BrokenBarrierException e) {
                          e.printStackTrace();
                      }
                  });
              }
              executorService.shutdown();
    
          }
          private static void  cxyDemo(int threadNum) throws InterruptedException, BrokenBarrierException {
              Thread.sleep(1000);
              System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
              cyclicBarrier.await();
              System.out.println(threadNum);
    
          }*/
        private static CyclicBarrier cyclicBarrier;
    
        static class CyclicBarrierThread extends Thread {
            public void run() {
                System.out.println(Thread.currentThread().getName() + "到了"); //等待
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            cyclicBarrier = new CyclicBarrier(5, new Runnable() {
                @Override
                public void run() {
                    System.out.println("人到齐了,开始吧....");
                }
            });
            for (int i = 0; i < 5; i++) {
                new CyclicBarrierThread().start();
            }
        }
    }

    执行结果:

    那么await方法到底干了什么呢:

       public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }

    最终调用的dowait方法: /**

         * Main barrier code, covering the various policies.
         */
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {

          //实现锁,安全 final ReentrantLock
    lock = this.lock; lock.lock(); try { final Generation g = generation;         //分代被破坏,异常抛出 if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) {
              //线程被中断, breakBarrier();
    throw new InterruptedException(); }         //线程进来,减少数据量 int index = --count; if (index == 0) { // tripped
              // 达到指定的数量时候,执行
    boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run();
                //唤醒线程 ranAction
    = true;
                //更新分代 nextGeneration();
    return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally {
            //最终释放锁
    lock.unlock(); } }
    • 最后一个线程到达,即index == 0
    • 超出了指定时间(超时等待)
    • 其他的某个线程中断当前线程
    • 其他的某个线程中断另一个等待的线程
    • 其他的某个线程在等待barrier超时
    • 其他的某个线程在此barrier调用reset()方法。reset()方法用于将屏障重置为初始状态   

    应用场景;开会,还有就是例如现金很火的游戏,只有等待大家准备好了,才开始执行进入房间

  • 相关阅读:
    标签的讲解
    属性分类
    LeetCode 003. 无重复字符的最长子串 双指针
    Leetcode 136. 只出现一次的数字 异或性质
    Leetcode 231. 2的幂 数学
    LeetCode 21. 合并两个有序链表
    象棋博弈资源
    acwing 343. 排序 topsort floyd 传播闭包
    Leetcode 945 使数组唯一的最小增量 贪心
    Leetcode 785 判断二分图 BFS 二分染色
  • 原文地址:https://www.cnblogs.com/xiufengchen/p/10681576.html
Copyright © 2011-2022 走看看