zoukankan      html  css  js  c++  java
  • concurrent(六)同步辅助器CyclicBarrier & 源码分析

    参考文档:
    Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例:https://www.cnblogs.com/skywang12345/p/3533995.html
    简介
    CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的 barrier。基于ReentrantLock实现
    举个栗子

    /**
     * 简单模拟一下对战平台中玩家需要完全准备好了,才能进入游戏的场景。
     * 
     * @author BFD_526
     * 
     */
    public class CyclicBarrierTest {
    
        public static void main(String[] args) {
            test();
        }
        // 同步屏障
        static void test() {
            ExecutorService service = Executors.newFixedThreadPool(5);
            CyclicBarrier barrier = new CyclicBarrier(5);
            for (int i = 0; i < 5; i++) {
                service.execute(new Player("玩家" + i, barrier));
            }
            service.shutdown();
        }
        // 同步屏障重置
        static void test1() {
            ExecutorService service = Executors.newFixedThreadPool(5);
            CyclicBarrier barrier = new CyclicBarrier(5);
            for (int i = 0; i < 5; i++) {
                service.execute(new Player("玩家" + i, barrier));
            }
            for (int i = 5; i < 10; i++) {
                service.execute(new Player("玩家" + i, barrier));
            }
            service.shutdown();
        }
        // 在同步屏障结束后,启动优先线程
        static void test2() {
            ExecutorService service = Executors.newFixedThreadPool(5);
            CyclicBarrier ba = new CyclicBarrier(5, new Runnable() {
                @Override
                public void run() {
                    System.out.println("所有玩家已就位");
                }
            });
            for (int i = 0; i < 5; i++) {
                service.execute(new Player("玩家" + i, ba));
            }
        }
    }
    
    class Player implements Runnable {
        private final String name;
        private final CyclicBarrier barrier;
    
        public Player(String name, CyclicBarrier barrier) {
            this.name = name;
            this.barrier = barrier;
        }
    
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1 + (new Random().nextInt(3)));
                System.out.println(name + "已准备,等待其他玩家准备...");
                barrier.await();
                TimeUnit.SECONDS.sleep(1 + (new Random().nextInt(3)));
                System.out.println(name + "已加入游戏");
            } catch (InterruptedException e) {
                System.out.println(name + "离开游戏");
            } catch (BrokenBarrierException e) {
                System.out.println(name + "离开游戏");
            }
        }
    }
    View Code

    源码分析

    函数列表

    CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作
    CyclicBarrier(int parties, Runnable barrierAction):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行
    int await():在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待
    int await(long timeout, TimeUnit unit):在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间
    int getNumberWaiting():返回当前在屏障处等待的参与者数目
    int getParties():返回要求启动此 barrier 的参与者数目
    boolean isBroken():查询此屏障是否处于损坏状态
    void reset():将屏障重置为其初始状态

    await()

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        // 获取“独占锁(lock)”
        lock.lock();
        try {
            // 保存“当前的generation”
            final Generation g = generation;
            // 若“当前generation已损坏”,则抛出异常。
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
           // 将“count计数器”-1
           int index = --count;
           // 如果index=0,则意味着“有parties个线程到达barrier”
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   // 如果barrierCommand不为null,则执行该动作
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   // 唤醒所有等待线程,并更新generation
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }
            // 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,
            // 当前线程才继续执行。
            for (;;) {
                try {
                    // 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 如果等待过程中,线程被中断,则执行下面的函数
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
                // 如果“当前generation已经损坏”,则抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
                // 如果“generation已经换代”,则返回index
                if (g != generation)
                    return index;
                // 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放“独占锁(lock)”
            lock.unlock();
        }
    }

    generation是CyclicBarrier的一个成员变量,它的定义如下:

    private Generation generation = new Generation();
    
    private static class Generation {
        boolean broken = false;
    }

    在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代
    当有parties个线程到达barrier,generation就会被更新换代
    换代:

    //换代
    private
    void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
  • 相关阅读:
    selenium浏览器参数设置详解——转
    py打包工具
    BurpSuite暴力破解和防御实战
    费马小定理(确定n 是否为素数)
    如何实现一个RPC框架1 —— RPC简介和通信协议的定义
    如何实现一个RPC框架2 ——用Netty实现协议通信
    15.三数之和
    flex开发零碎笔记,随时补充
    转“国内图片网站Yupoo的架构”
    转“经验分享:大型高并发高负载网站的系统架构 ”
  • 原文地址:https://www.cnblogs.com/amei0/p/9021032.html
Copyright © 2011-2022 走看看