zoukankan      html  css  js  c++  java
  • zookeeper 分布式计数器

    分布式计数器的思路是:指定一个Zookeeper数据节点作为计数器,多个应用实例在分布式锁的控制下,通过更新该数据节点的内容来实现计数功能。

    Curator中封装了实现,例如 DistributedAtomicInteger 和 DistributedAtomicLong。

    以下是我写的一个测试用例:

    [java] view plain copy
     
    1. package com.my.CuratorTest;  
    2.   
    3. import org.apache.curator.framework.CuratorFramework;  
    4. import org.apache.curator.framework.CuratorFrameworkFactory;  
    5. import org.apache.curator.framework.recipes.atomic.AtomicValue;  
    6. import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;  
    7. import org.apache.curator.retry.ExponentialBackoffRetry;  
    8. import org.apache.curator.retry.RetryNTimes;  
    9.   
    10. import java.util.concurrent.CyclicBarrier;  
    11. import java.util.concurrent.ExecutorService;  
    12. import java.util.concurrent.Executors;  
    13.   
    14. /** 
    15.  * Title: 分布式计数器演示<br/> 
    16.  * Intention: <br/> 
    17.  * <p> 
    18.  * Class Name: com.my.CuratorTest.RecipesDisAutomicLong<br/> 
    19.  * Create Date: 2017/8/20 22:48 <br/> 
    20.  * Project Name: MyTest <br/> 
    21.  * Company:  All Rights Reserved. <br/> 
    22.  * Copyright © 2017 <br/> 
    23.  * </p> 
    24.  * <p> 
    25.  * author: GaoWei <br/> 
    26.  * 1st_examiner: <br/> 
    27.  * 2nd_examiner: <br/> 
    28.  * </p> 
    29.  * 
    30.  * @version 1.0 
    31.  * @since JDK 1.7 
    32.  */  
    33. public class RecipesDisAutomicLong {  
    34.   
    35.     static String disAutomicIntPath = "/curator_recipes_distatomicint_path3";  
    36.   
    37.     static CuratorFramework client = CuratorFrameworkFactory.builder()  
    38.             .connectString("127.0.0.1:2181")  
    39.             .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();  
    40.     static  DistributedAtomicLong atomicLong =  
    41.             new DistributedAtomicLong(client, disAutomicIntPath, new RetryNTimes(10, 500),  
    42.                     null);  
    43.   
    44.     public static void main(String[] args) throws Exception{  
    45.         client.start();  
    46.         Long[] nums = {1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L};  
    47.         ExecutorService executor = Executors.newFixedThreadPool(nums.length);  
    48.         CyclicBarrier barrier = new CyclicBarrier(nums.length);  
    49.         atomicLong.compareAndSet(atomicLong.get().postValue(), new Long(0));  
    50.         for (int i=0;i<nums.length;i++) {  
    51.             final int k = i;  
    52.             executor.execute(new Runnable() {  
    53.                 @Override  
    54.                 public void run() {  
    55.                     try {  
    56.                         barrier.await();  
    57.                         System.out.println(Thread.currentThread().getName()+" " + System.nanoTime()+ " , 开始执行");  
    58.                         AtomicValue<Long> av = atomicLong.add(nums[k]);  
    59.                         System.out.println("to add value=" + nums[k] + ", Result=" + av.succeeded() + " preValue=" + av.preValue()  
    60.                                 + " postValue=" + av.postValue());  
    61.                     } catch (Exception e) {  
    62.                         e.printStackTrace();  
    63.                     }  
    64.                 }  
    65.             });  
    66.         }  
    67.         executor.shutdown();  
    68.     }  
    69. }  

    运行结果:

    pool-3-thread-10 89586635189009 , 开始执行
    pool-3-thread-8 89586635508120 , 开始执行
    pool-3-thread-9 89586635501453 , 开始执行
    pool-3-thread-7 89586635493898 , 开始执行
    pool-3-thread-6 89586635480564 , 开始执行
    pool-3-thread-5 89586635470342 , 开始执行
    pool-3-thread-4 89586635427231 , 开始执行
    pool-3-thread-3 89586635410787 , 开始执行
    pool-3-thread-2 89586635360564 , 开始执行
    pool-3-thread-1 89586635236564 , 开始执行
    to add value=10, Result=true preValue=0 postValue=10
    to add value=2, Result=true preValue=10 postValue=12
    to add value=6, Result=true preValue=12 postValue=18
    to add value=1, Result=true preValue=18 postValue=19
    to add value=7, Result=true preValue=19 postValue=26
    to add value=3, Result=true preValue=26 postValue=29
    to add value=4, Result=true preValue=29 postValue=33
    to add value=8, Result=true preValue=33 postValue=41
    to add value=9, Result=true preValue=41 postValue=50
    to add value=5, Result=true preValue=50 postValue=55

    如果在DistributedAtomicLong的构造方法参数中,RetryNTimes重试次数不够,比如是3,你会发现并不一定每次加数都会成功。显然这里是用了乐观锁机制,它并不保证操作一定成功(它在重试这么多次中都没有成功获得锁,导致操作没有执行),所以我们有必要通过调用 av.succeeded() 来查看此次加数是否成功。

    下面是RetryNTimes为3时的某一次运行结果:

    pool-3-thread-1 89922027531135 , 开始执行
    pool-3-thread-5 89922027681802 , 开始执行
    pool-3-thread-8 89922027737357 , 开始执行
    pool-3-thread-4 89922027673802 , 开始执行
    pool-3-thread-9 89922028120024 , 开始执行
    pool-3-thread-10 89922027531580 , 开始执行
    pool-3-thread-2 89922027616024 , 开始执行
    pool-3-thread-3 89922027606246 , 开始执行
    pool-3-thread-7 89922027722691 , 开始执行
    pool-3-thread-6 89922027699580 , 开始执行
    to add value=9, Result=true preValue=0 postValue=9
    to add value=10, Result=true preValue=9 postValue=19
    to add value=4, Result=true preValue=19 postValue=23
    to add value=7, Result=true preValue=23 postValue=30
    to add value=3, Result=true preValue=30 postValue=33
    to add value=2, Result=true preValue=33 postValue=35
    to add value=5, Result=true preValue=35 postValue=40
    to add value=1, Result=false preValue=35 postValue=0
    to add value=6, Result=false preValue=35 postValue=0
    to add value=8, Result=false preValue=35 postValue=0

    参考:

    1、从PAXOS到ZOOKEEPER分布式一致性原理与实践

  • 相关阅读:
    Python入门:局部变量与全局变量2
    Python入门:局部变量与全局变量1
    Python入门:函数参数1
    Python入门:文件操作1
    Python入门:集合操作
    Python入门:用字典实现三级菜单
    Python入门:购物车实例
    Python:循环
    git 提交指定提交时用户名
    mysql 判断null 和 空字符串
  • 原文地址:https://www.cnblogs.com/maohuidong/p/8407498.html
Copyright © 2011-2022 走看看