zoukankan      html  css  js  c++  java
  • [curator] Netflix Curator 使用

    curator简介

    Netflix curator 是Netflix公司开源的一个Zookeeper client library,用于简化zookeeper客户端编程,包含一下几个模块:

    • curator-client - zookeeper client封装,用于取代原生的zookeeper客户端,提供一些非常有用的客户端特性
    • curator-framework - zookeeper api的高层封装,大大简化zookeeper客户端编程,添加了例如zookeeper连接管理、重试机制等
    • curator-recipes - zookeeper recipes 基于curator-framework的实现(除2PC以外)

    maven dependency:

    [html] view plaincopy
     
    1. <dependency>  
    2.     <groupId>com.netflix.curator</groupId>  
    3.     <artifactId>curator-recipes</artifactId>  
    4.     <version>0.6.4</version>  
    5. </dependency>  

    注意:在www.mvnrepository.com中认为0.32为最新版本,其实迄今为止最新版本为0.64,github trunk中的版本现在是0.65-SNAPSHOT

    curator framework 使用

    示例代码:

    [java] view plaincopy
     
    1.               String path = "/test_path";  
    2. CuratorFramework client = CuratorFrameworkFactory.builder()  
    3.         .connectString("test:2181").namespace("/test1")  
    4.         .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))  
    5.         .connectionTimeoutMs(5000).build();  
    6. //create a node  
    7. client.create().forPath("/head", new byte[0]);  
    8.   
    9. //delete a node in background  
    10. client.delete().inBackground().forPath("/head");  
    11.   
    12. // create a EPHEMERAL_SEQUENTIAL  
    13. client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);  
    14.   
    15. // get the data   
    16. client.getData().watched().inBackground().forPath("/test");  
    17.   
    18. // check the path exits  
    19. client.checkExists().forPath(path);  


    curator framework使用builder模式和类似nio的chain api,代码非常简洁

    curator recipes 使用

    InterProcessMutex

    用途:进程间互斥锁

    示例代码:

    [java] view plaincopy
     
    1. String lockName = "/lock1";  
    2. InterProcessLock lock1 = new InterProcessMutex(this.curator, lockName);  
    3. InterProcessLock lock2 = new InterProcessMutex(this.curator, lockName);  
    4. lock1.acquire();  
    5. boolean result = lock2.acquire(1, TimeUnit.SECONDS);  
    6. assertFalse(result);  
    7. lock1.release();  
    8. result = lock2.acquire(1, TimeUnit.SECONDS);  
    9. assertTrue(result);  

    原理:每次调用acquire在/lock1节点节点下使用CreateMode.EPHEMERAL_SEQUENTIAL 创建新的ephemeral节点,然后getChildren获取所有的children,判断刚刚创建的临时节点是否为第一个,如果是,则获取锁成功;如果不是,则删除刚刚创建的临时节点。

    注意: 每次accquire操作,成功,则请求zk server 2次(一次写,一次getChildren);如果失败,则请求zk server 3次(一次写,一次getChildren,一次delete)

    InterProcessReadWriteLock

    示例代码:

    [java] view plaincopy
     
    1. @Test  
    2. public void testReadWriteLock() throws Exception{  
    3.     String readWriteLockPath = "/RWLock";  
    4.     InterProcessReadWriteLock readWriteLock1 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);  
    5.     InterProcessMutex writeLock1 = readWriteLock1.writeLock();  
    6.     InterProcessMutex readLock1 = readWriteLock1.readLock();  
    7.       
    8.     InterProcessReadWriteLock readWriteLock2 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);  
    9.     InterProcessMutex writeLock2 = readWriteLock2.writeLock();  
    10.     InterProcessMutex readLock2 = readWriteLock2.readLock();  
    11.     writeLock1.acquire();  
    12.       
    13.     // same with WriteLock, can read  
    14.     assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));  
    15.       
    16.     // different lock, can't read while writting  
    17.     assertFalse(readLock2.acquire(1, TimeUnit.SECONDS));  
    18.       
    19.     // different write lock, can't write  
    20.     assertFalse(writeLock2.acquire(1, TimeUnit.SECONDS));  
    21.       
    22.     // release the write lock  
    23.     writeLock1.release();  
    24.       
    25.     //both read lock can read  
    26.     assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));  
    27.     assertTrue(readLock2.acquire(1, TimeUnit.SECONDS));  
    28. }  

    原理: 同InterProcessMutext,在ephemeral node的排序算法上做trick,write lock的排序在前。

    注意: 同一个InterProcessReadWriteLock如果已经获取了write lock,则获取read lock也会成功

    LeaderSelector

    示例代码:

    [java] view plaincopy
     
    1. @Test  
    2. public void testLeader() throws Exception{  
    3.     LeaderSelectorListener listener = new LeaderSelectorListener(){  
    4.   
    5.   
    6.         @Override  
    7.         public void takeLeadership(CuratorFramework client)  
    8.                 throws Exception {  
    9.             System.out.println("i'm leader");  
    10.         }  
    11.   
    12.         @Override  
    13.         public void handleException(CuratorFramework client,  
    14.                 Exception exception) {  
    15.               
    16.         }  
    17.   
    18.         @Override  
    19.         public void notifyClientClosing(CuratorFramework client) {  
    20.               
    21.         }};  
    22.     String leaderPath = "/leader";  
    23.     LeaderSelector selector1 = new LeaderSelector(this.curator, leaderPath, listener);  
    24.     selector1.start();  
    25.     LeaderSelector selector2 = new LeaderSelector(this.curator, leaderPath, listener);  
    26.     selector2.start();  
    27.     assertFalse(selector2.hasLeadership());  
    28. }  


    原理:内部基于InterProcessMutex实现,具体细节参见shared lock一节

    总结

        curator还提供了很多其他的实现,具体参见https://github.com/Netflix/curator/wiki/Recipes 

  • 相关阅读:
    逻辑思维杂想
    C++二叉树实现
    斐波那数列递归实现与动态规划实现
    C++双向链表的实现
    C++单链表实现
    C++顺序表实现
    windows下端口占用处理工具
    [项目记录]一个.net下使用HAP实现的吉大校园通知网爬虫工具:OAWebScraping
    [c++]大数运算---利用C++ string实现任意长度正小数、整数之间的加减法
    [C++]几种排序
  • 原文地址:https://www.cnblogs.com/405845829qq/p/5032933.html
Copyright © 2011-2022 走看看