zoukankan      html  css  js  c++  java
  • 07.Curator计数器

        这一篇文章我们将学习使用Curator来实现计数器。顾名思义,计数器是用来计数的,利用ZooKeeper可以实现一个集群共享的计数器。只要使用相同的path就可以得到最新的计数器值,这是由ZooKeeper的一致性保证的。Curator有两个计数器,一个是用int来计数,一个用long来计数。

    1.SharedCount

    1.SharedCount计数器介绍
    这个类使用int类型来计数。 主要涉及三个类。
    • SharedCount - 管理一个共享的整数。所有看同样的路径客户端将有共享的整数(考虑ZK的正常一致性保证)的最高最新的值。
    • SharedCountReader - 一个共享的整数接口,并允许监听改变它的值。
    • SharedCountListener - 用于监听共享整数发生变化的监听器。
    SharedCount类代表计数器,可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值,包括字面值和带版本信息的值VersionedValue。
    注意:使用SharedCount之前需要调用start(),使用完成之后需要调用stop()
    2.编写示例程序
    1. public class SharedCounterExample implements SharedCountListener
    2. {
    3. private static final int QTY = 5;
    4. private static final String PATH = "/examples/counter";

    5. public static void main(String[] args) throws IOException, Exception
    6. {
    7. final Random rand = new Random();
    8. SharedCounterExample example = new SharedCounterExample();
    9. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
    10. client.start();
    11. SharedCount baseCount = new SharedCount(client, PATH, 0);
    12. baseCount.addListener(example);
    13. baseCount.start();
    14. List<SharedCount> examples = Lists.newArrayList();
    15. ExecutorService service = Executors.newFixedThreadPool(QTY);
    16. for (int i = 0; i < QTY; ++i)
    17. {
    18. final SharedCount count = new SharedCount(client, PATH, 0);
    19. examples.add(count);
    20. Callable<Void> task = new Callable<Void>()
    21. {
    22. @Override
    23. public Void call() throws Exception
    24. {
    25. count.start();
    26. Thread.sleep(rand.nextInt(10000));
    27. count.setCount(rand.nextInt(10000));
    28. System.out.println("计数器当前值:" + count.getVersionedValue().getValue());
    29. System.out.println("计数器当前版本:" + count.getVersionedValue().getVersion());
    30. System.out.println("trySetCount:" + count.trySetCount(count.getVersionedValue(), 123));
    31. return null;
    32. }
    33. };
    34. service.submit(task);
    35. }
    36. service.shutdown();
    37. service.awaitTermination(10, TimeUnit.MINUTES);
    38. for (int i = 0; i < QTY; ++i)
    39. {
    40. examples.get(i).close();
    41. }
    42. baseCount.close();
    43. client.close();
    44. System.out.println("OK!");
    45. }
    46. @Override
    47. public void stateChanged(CuratorFramework client, ConnectionState newState)
    48. {
    49. System.out.println("连接状态: " + newState.toString());
    50. }
    51. @Override
    52. public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
    53. {
    54. System.out.println("计数器值改变:" + newCount);
    55. }
    56. }
    在这个例子中,我们使用baseCount来监听计数值(addListener方法)。任意的SharedCount,只要使用相同的PATH,都可以得到这个计数值。然后我们使用5个线程为计数值增加一个10以内的随机数。
    这里我们使用trySetCount去设置计数器。第一个参数提供当前的VersionedValue,如果期间其它client更新了此计数值,你的更新可能不成功,但是这时你的client更新了最新的值,所以失败了你可以尝试再更新一次。而setCount是强制更新计数器的值。
    注意:计数器必须start,使用完之后必须调用close关闭它。
    在这里再重复一遍前面讲到的, 强烈推荐你监控ConnectionStateListener, 尽管我们的有些例子没有监控它。 在本例中SharedCountListener扩展了ConnectionStateListener。 这一条针对所有的Curator recipes都适用,后面的文章中就不专门提示了。
    3.示例程序运行结果
        运行结果控制台:
    1. 连接状态: CONNECTED
    2. 计数器当前值:1684
    3. 计数器当前版本:11
    4. trySetCount:true
    5. 计数器值改变:123
    6. 计数器当前值:8425
    7. 计数器当前版本:13
    8. trySetCount:true
    9. 计数器值改变:123
    10. 计数器当前值:9369
    11. 计数器当前版本:15
    12. trySetCount:true
    13. 计数器值改变:123
    14. 计数器当前值:4075
    15. 计数器当前版本:17
    16. trySetCount:true
    17. 计数器值改变:123
    18. 计数器当前值:9221
    19. 计数器当前版本:19
    20. trySetCount:true
    21. OK!
        Zookeeper节点信息如下:

    2.DistributedAtomicLong

        再看一个Long类型的计数器。除了计数的范围比SharedCount大了之外,它首先尝试使用乐观锁的方式设置计数器,如果不成功(比如期间计数器已经被其它client更新了),它使用InterProcessMutex方式来更新计数值。还记得InterProcessMutex是什么吗?它是我们前面讲的分布式可重入锁。这和上面的计数器的实现有显著的不同。
    1.DistributedAtomicLong计数器介绍
    DistributedAtomicLong计数器和上面的计数器的实现有显著的不同,可以从它的内部实现DistributedAtomicValue.trySet中看出端倪。
    1. public class DistributedAtomicLong implements DistributedAtomicNumber<Long>
    2. {
    3. private final DistributedAtomicValue value;
    4. ......
    5. }
    6. public class DistributedAtomicValue
    7. {
    8. ......
    9. AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception
    10. {
    11. MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
    12. tryOptimistic(result, makeValue);
    13. if ( !result.succeeded() && (mutex != null) )
    14. {
    15. tryWithMutex(result, makeValue);
    16. }
    17. return result;
    18. }
    19. ......
    20. }
    此计数器有一系列的操作:
    • get(): 获取当前值
    • increment(): 加一
    • decrement(): 减一
    • add(): 增加特定的值
    • subtract(): 减去特定的值
    • trySet(): 尝试设置计数值
    • forceSet(): 强制设置计数值
    你必须检查返回结果的succeeded(),它代表此操作是否成功。如果操作成功,preValue()代表操作前的值,postValue()代表操作后的值。
    2.编写示例程序
    我们下面的例子中使用5个线程对计数器进行加一操作,如果成功,将操作前后的值打印出来。
    1. public class DistributedAtomicLongExample
    2. {
    3. private static final int QTY = 5;
    4. private static final String PATH = "/examples/counter";
    5. public static void main(String[] args) throws IOException, Exception
    6. {
    7. final Random rand = new Random();
    8. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
    9. client.start();
    10. List<DistributedAtomicLong> examples = Lists.newArrayList();
    11. ExecutorService service = Executors.newFixedThreadPool(QTY);
    12. for (int i = 0; i < QTY; ++i)
    13. {
    14. final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));
    15. examples.add(count);
    16. Callable<Void> task = new Callable<Void>()
    17. {
    18. @Override
    19. public Void call() throws Exception
    20. {
    21. try
    22. {
    23. Thread.sleep(1000 + rand.nextInt(10000));
    24. AtomicValue<Long> value = count.increment();
    25. System.out.println("修改成功: " + value.succeeded());
    26. if (value.succeeded())
    27. {
    28. System.out.println("修改之前的值:" + value.preValue() + " | 修改之后的值:" + value.postValue());
    29. }
    30. }
    31. catch (Exception e)
    32. {
    33. e.printStackTrace();
    34. }
    35. return null;
    36. }
    37. };
    38. service.submit(task);
    39. }
    40. service.shutdown();
    41. service.awaitTermination(10, TimeUnit.MINUTES);
    42. client.close();
    43. System.out.println("OK!");
    44. }
    45. }
    注意:你必须检查返回结果的succeeded(),它代表此操作是否成功。如果操作成功,preValue()代表操作前的值,postValue()代表操作后的值。
    3.示例程序运行结果
        运行结果控制台:
    1. 修改成功: true
    2. 修改之前的值:0 | 修改之后的值:1
    3. 修改成功: true
    4. 修改之前的值:1 | 修改之后的值:2
    5. 修改成功: true
    6. 修改之前的值:2 | 修改之后的值:3
    7. 修改成功: true
    8. 修改之前的值:3 | 修改之后的值:4
    9. 修改成功: true
    10. 修改之前的值:4 | 修改之后的值:5
    11. OK!
        Zookeeper节点信息如下:
    -------------------------------------------------------------------------------------------------------------------------------



  • 相关阅读:
    rem单位
    JS_高程5.引用类型(5)Array类型的操作方法
    JS_理解函数参数按值传递
    JS_高程5.引用类型(4)Array类型的各类方法
    JS_高程5.引用类型(3)Array类型-检测数组
    JS_高程5.引用类型(2)Array类型
    JS_高程5.引用类型(1)Object类型
    JS_高程4.变量,作用域和内存问题(3)垃圾收集
    JS_高程4.变量,作用域和内存问题(2)执行环境及作用域
    JS_高程4.变量,作用域和内存问题(1)
  • 原文地址:https://www.cnblogs.com/LiZhiW/p/4941771.html
Copyright © 2011-2022 走看看