zoukankan      html  css  js  c++  java
  • 06.Curator Barrier

        分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。
        比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。

    1.栅栏Barrier

    1.DistributedBarrier类说明
    DistributedBarrier类实现了栅栏的功能。它的构造函数如下:
    1. /**
    2. * @param client client
    3. * @param barrierPath path to use as the barrier
    4. */
    5. public DistributedBarrier(CuratorFramework client, String barrierPath)
    DistributedBarrier构造函数中barrierPath参数用来确定一个栅栏,只要barrierPath参数相同(路径相同)就是同一个栅栏。通常情况下栅栏的使用如下:
        1.主导client设置一个栅栏
        2.其他客户端就会调用waitOnBarrier()等待栅栏移除,程序处理线程阻塞
        3.主导client移除栅栏,其他客户端的处理程序就会同时继续运行。
    DistributedBarrier类的主要方法如下:
    • setBarrier() - 设置栅栏
    • waitOnBarrier() - 等待栅栏移除
    • removeBarrier() - 移除栅栏
    异常处理:DistributedBarrier会监控连接状态,当连接断掉时waitOnBarrier()方法会抛出异常。
    2.编写示例程序
    1. public class DistributedBarrierExample
    2. {
    3. private static final int QTY = 5;
    4. private static final String PATH = "/examples/barrier";
    5. public static void main(String[] args) throws Exception
    6. {
    7. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
    8. client.start();
    9. ExecutorService service = Executors.newFixedThreadPool(QTY);
    10. DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
    11. controlBarrier.setBarrier();
    12. for (int i = 0; i < QTY; ++i)
    13. {
    14. final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
    15. final int index = i;
    16. Callable<Void> task = new Callable<Void>()
    17. {
    18. @Override
    19. public Void call() throws Exception
    20. {
    21. Thread.sleep((long) (3 * Math.random()));
    22. System.out.println("Client #" + index + " 等待");
    23. barrier.waitOnBarrier();
    24. System.out.println("Client #" + index + " 开始");
    25. return null;
    26. }
    27. };
    28. service.submit(task);
    29. }
    30. Thread.sleep(1000 * 3);
    31. System.out.println("所有的Client都在等待");
    32. controlBarrier.removeBarrier();
    33. service.shutdown();
    34. service.awaitTermination(10, TimeUnit.MINUTES);
    35. client.close();
    36. System.out.println("OK!");
    37. }
    38. }
    这个例子创建了controlBarrier来设置栅栏和移除栅栏。我们创建了5个线程,在此Barrier上等待。最后移除栅栏后所有的线程才继续执行。
    如果你开始不设置栅栏,所有的线程就不会阻塞住。
    3.示例程序运行结果
        运行结果控制台:
    1. Client #1 等待
    2. Client #2 等待
    3. Client #0 等待
    4. Client #4 等待
    5. Client #3 等待
    6. 所有的Client都在等待
    7. Client #4 开始
    8. Client #2 开始
    9. Client #0 开始
    10. Client #3 开始
    11. Client #1 开始
    12. OK!
        运行时查看Zookeeper节点信息如下:

    2.双栅栏Double Barrier

        双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算,当计算完成时,离开栅栏。双栅栏类是DistributedDoubleBarrier
    1. DistributedDoubleBarrier类说明
    DistributedDoubleBarrier类实现了双栅栏的功能。它的构造函数如下:
    1. // client - the client
    2. // barrierPath - path to use
    3. // memberQty - the number of members in the barrier
    4. public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)
    memberQty是成员数量,当enter方法被调用时,成员被阻塞,直到所有的成员都调用了enter。当leave方法被调用时,它也阻塞调用线程,知道所有的成员都调用了leave。
    就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。
    注意:参数memberQty的值只是一个阈值,而不是一个限制值。当等待栅栏的数量大于或等于这个值栅栏就会打开!

    与栅栏(DistributedBarrier)一样,双栅栏的barrierPath参数也是用来确定是否是同一个栅栏的,双栅栏的使用情况如下:
        1.从多个客户端在同一个路径上创建双栅栏(DistributedDoubleBarrier),然后调用enter()方法,等待栅栏数量达到memberQty时就可以进入栅栏。
        2.栅栏数量达到memberQty,多个客户端同时停止阻塞继续运行,直到执行leave()方法,等待memberQty个数量的栅栏同时阻塞到leave()方法中。
        3.memberQty个数量的栅栏同时阻塞到leave()方法中,多个客户端的leave()方法停止阻塞,继续运行。
    DistributedDoubleBarrier类的主要方法如下:
    • enter()、enter(long maxWait, TimeUnit unit) - 等待同时进入栅栏
    • leave()、leave(long maxWait, TimeUnit unit) - 等待同时离开栅栏
    异常处理:DistributedDoubleBarrier会监控连接状态,当连接断掉时enter()和leave方法会抛出异常。
    2.编写示例程序
    1. public class DistributedBarrierDoubleExample
    2. {
    3. private static final int QTY = 5;
    4. private static final String PATH = "/examples/barrier";
    5. public static void main(String[] args) throws Exception
    6. {
    7. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
    8. client.start();
    9. ExecutorService service = Executors.newFixedThreadPool(QTY);
    10. for (int i = 0; i < (QTY + 2); ++i)
    11. {
    12. final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
    13. final int index = i;
    14. Callable<Void> task = new Callable<Void>()
    15. {
    16. @Override
    17. public Void call() throws Exception
    18. {
    19. Thread.sleep((long) (3 * Math.random()));
    20. System.out.println("Client #" + index + " 等待");
    21. if(false == barrier.enter(5, TimeUnit.SECONDS))
    22. {
    23. System.out.println("Client #" + index + " 等待超时!");
    24. return null;
    25. }
    26. System.out.println("Client #" + index + " 进入");
    27. Thread.sleep((long) (3000 * Math.random()));
    28. barrier.leave();
    29. System.out.println("Client #" + index + " 结束");
    30. return null;
    31. }
    32. };
    33. service.submit(task);
    34. }
    35. service.shutdown();
    36. service.awaitTermination(10, TimeUnit.MINUTES);
    37. client.close();
    38. System.out.println("OK!");
    39. }
    40. }
    注意:创建双栅栏的数量为:(QTY + 2),而创建双栅栏的参数为:new DistributedDoubleBarrier(client, PATH, QTY)当等待栅栏的数量大于或等于这个值(QTY)栅栏就会打开!
    3.示例程序运行结果
        运行结果控制台:
    1. Client #0 等待
    2. Client #2 等待
    3. Client #3 等待
    4. Client #4 等待
    5. Client #1 等待
    6. Client #4 进入
    7. Client #2 进入
    8. Client #0 进入
    9. Client #1 进入
    10. Client #3 进入
    11. Client #4 结束
    12. Client #5 等待
    13. Client #2 结束
    14. Client #3 结束
    15. Client #6 等待
    16. Client #0 结束
    17. Client #1 结束
    18. Client #5 等待超时!
    19. Client #6 等待超时!
    20. OK!
        运行时查看Zookeeper节点信息如下:

    -------------------------------------------------------------------------------------------------------------------------------



  • 相关阅读:
    OPCUA+MQTT构建物联网通用框架
    上位机开发之单片机通信实践(一)
    上位机开发之三菱Q系列PLC通信实践
    H5中你意想不到的美好
    ModbusRtu通信报文详解【二】
    ModbusRtu通信报文详解【一】
    基于C#实现与JY61姿态角度传感器通信
    以数字资产模型为核心驱动的一站式IoT数据分析实践
    基于华为云IoT Studio自助生成10万行代码的奥秘
    一条物联网设备控制命令的一生
  • 原文地址:https://www.cnblogs.com/LiZhiW/p/4937547.html
Copyright © 2011-2022 走看看