zoukankan      html  css  js  c++  java
  • zookeeper之分布式锁以及分布式计数器(通过curator框架实现)

    有人可能会问zookeeper我知道,但是curator是什么呢?

    其实curator是apachede针对zookeeper开发的一个api框架是apache的顶级项目 他与zookeeper原生api相比更加简洁方便使用

    特别就是注册watcher这方面.再也不用我们手工去重复注册watcher了.我们只需监听一下然后curator全给我们做了.而且支持递归创建节点

    和递归删除节点.

    更大的优势是实现分布式锁和分布式计数器以及分布式的同步更加方便.

    以前我们基于zk原生的api实现分布式锁相当麻烦,但是我们基于curator去实现分布式锁那就是特别简单的事了.

    废话不多说直接上代码

    一个简单的demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    /**
     * 分布式锁
     */
    public class CouratorLock {
     
        //初始化url
        private static final String url="1.11.11.1:2181,1.11.11.2:2182,1.11.11.3:2183";
     
        private static int count=10;
     
        public static void main(String[] args){
     
            for(int i=0;i<10;i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        CuratorFramework zk= CuratorFrameworkFactory.builder()
                                .sessionTimeoutMs(5000)
                                .retryPolicy(new RetryNTimes(3,1000))
                                .connectionTimeoutMs(50000)
                                .connectString(url)
                                .build();
                        zk.start();
                        //分布式锁
                        InterProcessMutex lock=new InterProcessMutex(zk,"/lock");
                        try {
                            //枷锁
                            lock.acquire();
                            get();
                        catch (Exception e) {
                            e.printStackTrace();
                        }finally {
                            try {
                                //释放锁
                                lock.release();
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start();
            }
        }
        public static void get(){
            count--;
            System.out.println(count);
        }
    }

      

    没错就是这么简单,curator为我们提供了InterProcessMutex这个类来实现分布式的锁

    其中.acquire();这个方法就是开始枷锁..release();就是释放锁.

    看很简单吧.相比我们用zk原生的api实现起来超级简单

    其实底层还是基于临时节点实现的,而且curator支持一个更加强大的功能,就是当你的客户端下线以后再次启动如果再次期间你监听的节点的有变化,

    curator的watcher会立马进行回调..

    所以使用curator开发相当方便,

    我们看下curator的CRUD

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    //通过工厂创建连接
           CuratorFramework cachezk= CuratorFrameworkFactory.builder()
                   .connectString(url)
                   .sessionTimeoutMs(10000)
                   .retryPolicy(new RetryNTimes(1000,10))
                   .build();
           cachezk.start();
    //创建节点
           cachezk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/gang","第一个watcher".getBytes());
           //更新节点
           cachezk.setData().forPath("/super","修改节点".getBytes());
           //删除节点
           cachezk.delete().deletingChildrenIfNeeded().forPath("/super");
           //关闭连接
           cachezk.close();
       }

      


    下面我们看下curator是怎么实现分布式计数器的

    其实也是很简单,以为复杂的curator都提我们做完了

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    /**
     * 分布式的计数器
     */
    public class CuratorCounter {
        //初始化url
        private static final String url="1.11.11.1:2181,1.11.11.2:2182,1.11.11.3:2183";
     
        private static int count=10;
     
        public static void main(String[] args) throws Exception {
     
            CuratorFramework zk= CuratorFrameworkFactory.builder()
                    .sessionTimeoutMs(5000)
                    .retryPolicy(new RetryNTimes(3,1000))
                    .connectionTimeoutMs(50000)
                    .connectString(url)
                    .build();
            zk.start();
            //分布式计数器
            DistributedAtomicInteger counter=new DistributedAtomicInteger(zk,"/super",new RetryNTimes(3,100));
            //初始化
            //counter.forceSet(0);
            AtomicValue<Integer> value = counter.increment();
            System.out.println("原值为"+value.preValue());
            System.out.println("更改后的值为"+value.postValue());
            System.out.println("状态"+value.succeeded());
     
        }
    }

      

     


    DistributeAtomicInteger就是curator为我们提供的分布式计数器的类

    关于分布式同步机制我就不给大家介绍了.其实也是很简单的.你们自行去了解吧

  • 相关阅读:
    css清除浮动
    对象遍历
    vue 小工具
    数据库优化-聚合索引
    mysql修改唯一索引
    输出带pre的调试信息
    mysql的group by优化跟limit优化
    地址联动数据抓取
    php7新特新
    linux同步系统时间
  • 原文地址:https://www.cnblogs.com/sunsing123/p/10303333.html
Copyright © 2011-2022 走看看