zoukankan      html  css  js  c++  java
  • per学习笔记-zkclient,curator使用

    开源客户端,原生api的不足 

    连接的创建是异步的,需要开发人员自行编码实现等待 
    连接没有自动的超时重连机制 
    Zk本身没提供序列化机制,需要开发人员自行指定,从而实现数据的序列化和反序列化 
    Watcher注册一次只会生效一次,需要不断的重复注册 
    Watcher的使用方式不符合java本身的术语,如果采用监听器方式,更容易理解 
    不支持递归创建树形节点 

    开源客户端---ZkClient介绍 

    Github上一个开源的zk客户端,由datameer的工程师Stefan Groschupf和Peter Voss一起开发 
    – 解决session会话超时重连 
    – Watcher反复注册 
    – 简化开发api 
    – 还有..... 
    – https://github.com/sgroschupf/zkclient 

    开源客户端---Curator介绍 
    1. 使用CuratorFrameworkFactory工厂的两个静态方法创建客户端 
    a) static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, 
    RetryPolicy retryPolicy) 
    b) static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) 
    2. Start()方法启动 
    参数说明 
    connectString 分开的ip:port对 
    retryPolicy 重试策略,默认四种:Exponential BackoffRetry,RetryNTimes ,RetryOneTime, 
    RetryUntilElapsed 
    sessionTimeoutMs 会话超时时间,单位为毫秒,默认60000ms 
    connectionTimeoutMs 连接创建超时时间,单位为毫秒,默认是15000ms 

    重试策略 
    – 实现接口RetryPolicy可以自定重重试策略 
    • boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) 

    retryCount 已经重试的次数,如果第一次重试 此值为0 
    elapsedTimeMs 重试花费的时间,单位为毫秒 
    sleeper 类似于Thread.sleep,用于sleep指定时间 
    返回值 如果还会继续重试,则返回true 
    四种默认重试策略 
    – ExponentialBackoffRetry 
    • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) 
    • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 
    • 当前应该sleep的时间: baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))),随着重试次数增加重试时间间隔变大,指数倍增长 

    参数说明 
    baseSleepTimeMs 初始sleep时间 
    maxRetries 最大重试次数 
    maxSleepMs 最大重试时间 
    返回值 如果还会继续重试,则返回true 

    默认重试策略 
    – RetryNTimes 
    • RetryNTimes(int n, int sleepMsBetweenRetries) 
    • 当前应该sleep的时间 
    参数说明 
    n 最大重试次数 
    sleepMsBetweenRetries 每次重试的间隔时间 

    – RetryOneTime 
    • 只重试一次 
    • RetryOneTime(int sleepMsBetweenRetry), sleepMsBetweenRetry为重试间隔的时间 

    默认重试策略 
    – RetryUntilElapsed 
    • RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) 
    • 重试的时间超过最大时间后,就不再重试 
    参数说明 
    maxElapsedTimeMs 最大重试时间 
    sleepMsBetweenRetries 每次重试的间隔时间 

    Fluent风格的API 
    – 定义:一种面向对象的开发方式,目的是提高代码的可读性 
    – 实现方式:通过方法的级联或者方法链的方式实现 
    – 举例: 
            zkclient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("tests").build();

    创建节点 
    – 构建操作包装类(Builder): CreateBuilder create()---- CuratorFramework 
    – CreateBuilder 
    • creatingParentsIfNeeded() //递归创建父目录 
    • withMode(CreateMode mode)//设置节点属性,比如:CreateMode.PERSISTENT,如果是递归创建,创建模式 
    为临时节点,则只有叶子节点是临时节点,非叶子节点都为持久节点 
    • withACL(List aclList) //设置acl 
    • forPath(String path) //指定路劲 

    删除节点 
    – 构建操作包装类(Builder):DeleteBuilder delete() -----CuratorFramework 
    – DeleteBuilder 
    • withVersion(int version) //特定版本号 
    • guaranteed() //确保节点被删除 
    • forPath(String path) //指定路径 
    • deletingChildrenIfNeeded() //递归删除所有子节点 

    关于guaranteed: 
    Solves edge cases where an operation may succeed on the server but connection failure 
    occurs before a response can be successfully returned to the client 
    意思是:解决当某个删除操作在服务器端可能成功,但是此时客户端与服务器端的连接中断,而删除的响 
    应没有成功返回到客户端 
    底层的本质是重试 


    关于异步操作 
    – inBackground() 
    – inBackground(Object context) 
    – inBackground(BackgroundCallback callback) 
    – inBackground(BackgroundCallback callback, Object context) 
    – inBackground(BackgroundCallback callback, Executor executor) 
    – inBackground(BackgroundCallback callback, Object context, Executor executor) 
    从参数看跟zk的原生异步api相同,多了一个线程池,用于执行回调 

    读取数据 
    – 构建操作包装类(Builder): GetDataBuilder getData() -----CuratorFramework 
    – GetDataBuilder 
    • storingStatIn(org.apache.zookeeper.data.Stat stat) //把服务器端获取的状态数据存储到stat对象 
    • Byte[] forPath(String path)//节点路径 

    读取子节点 
    – 构建操作包装类(Builder): GetChildrenBuilder getChildren() -----CuratorFramework 
    – GetChildrenBuilder 
    • storingStatIn(org.apache.zookeeper.data.Stat stat) //把服务器端获取的状态数据存储到stat对象 
    • Byte[] forPath(String path)//节点路径 
    • usingWatcher(org.apache.zookeeper.Watcher watcher) //设置watcher,类似于zk本身的api,也只能使用一次 
    • usingWatcher(CuratorWatcher watcher) //设置watcher ,类似于zk本身的api,也只能使用一次 


    设置watcher 
    – NodeCache 
    • 监听数据节点的内容变更 
    • 监听节点的创建,即如果指定的节点不存在,则节点创建后,会触发这个监听 
    – PathChildrenCache 
    • 监听指定节点的子节点变化情况 
    • 包括:新增子节点 子节点数据变更 和子节点删除 
    NodeCache 
    – 构造函数 
    • NodeCache(CuratorFramework client, String path) 
    • NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) 
    参数说明 
    client 客户端实例 
    path 数据节点路径 
    dataIsCompressed 是否进行数据压缩 
    – 回调接口 
    • public interface NodeCacheListener 
    void nodeChanged() //没有参数,怎么获取事件信息以及节点数据? 
    PathChildrenCache 
    client 客户端实例 
    path 数据节点路径 
    dataIsCompressed 是否进行数据压缩 
    cacheData 用于配置是否把节点内容缓存起来,如果配置为true,那么客户端在接 
    收到节点列表变更的同时,也能够获取到节点的数据内容,如果为false 
    则无法取到数据内容 
    threadFactory 通过这两个参数构造专门的线程池来处理事件通知 
    executorService 

    PathChildrenCache 
    – 监听接口 
    • 时间类型包括:新增子节点(CHILD_ADDED),子节点数据变更(CHILD_UPDATED),子节点删除(CHILD_REMOVED) 
    – PathChildrenCache.StartMode 
    • BUILD_INITIAL_CACHE //同步初始化客户端的cache,及创建cache后,就从服务器端拉入对应的数据 
    • NORMAL //异步初始化cache 
    • POST_INITIALIZED_EVENT //异步初始化,初始化完成触发事件PathChildrenCacheEvent.Type.INITIALIZED 

    zkclient举例 

    Java代码  收藏代码
    1. package com.zk.dev.zkClient.day1;  
    2.   
    3. import org.I0Itec.zkclient.IZkDataListener;  
    4. import org.I0Itec.zkclient.ZkClient;  
    5. import org.junit.After;  
    6. import org.junit.Before;  
    7. import org.junit.Test;  
    8.   
    9. import java.util.concurrent.TimeUnit;  
    10.   
    11. public class ZKTest  {  
    12.   
    13.     private ZkClient zk;  
    14.   
    15.     private String nodeName = "/test";  
    16.   
    17.     @Before  
    18.     public void initTest() {  
    19.         zk = new ZkClient("localhost:2181");  
    20.     }  
    21.   
    22.     @After  
    23.     public void dispose() {  
    24.         zk.close();  
    25.     }  
    26.   
    27.     @Test  
    28.     public void testListener() throws InterruptedException {  
    29.         // 监听指定节点的数据变化  
    30.   
    31.         zk.subscribeDataChanges(nodeName, new IZkDataListener() {  
    32.             public void handleDataChange(String s, Object o) throws Exception {  
    33.                 System.out.println("node data changed!");  
    34.                  System.out.println("node=>" + s);  
    35.                  System.out.println("data=>" + o);  
    36.                  System.out.println("--------------");  
    37.             }  
    38.   
    39.             public void handleDataDeleted(String s) throws Exception {  
    40.                 System.out.println("node data deleted!");  
    41.                 System.out.println("s=>" + s);  
    42.                 System.out.println("--------------");  
    43.   
    44.             }  
    45.         });  
    46.   
    47.                 System.out.println("ready!");  
    48.   
    49.         // junit测试时,防止线程退出  
    50.         while (true) {  
    51.             TimeUnit.SECONDS.sleep(5);  
    52.         }  
    53.     }  
    54.   
    55.     @Test  
    56.     public void testUpdateConfig() throws InterruptedException {  
    57.         if (!zk.exists(nodeName)) {  
    58.             zk.createPersistent(nodeName);  
    59.         }  
    60.         zk.writeData(nodeName, "1");  
    61.         zk.writeData(nodeName, "2");  
    62.         zk.delete(nodeName);  
    63.         zk.delete(nodeName);   
    64.         zk.writeData("/test/ba", "bbb");  
    65.     }  
    66. }  


    curator举例 

    Java代码  收藏代码
      1. package com.zk.dev.zkClient.day1;  
      2.   
      3. import org.apache.curator.RetryPolicy;  
      4. import org.apache.curator.framework.CuratorFramework;  
      5. import org.apache.curator.framework.CuratorFrameworkFactory;  
      6. import org.apache.curator.framework.recipes.cache.NodeCache;  
      7. import org.apache.curator.framework.recipes.cache.NodeCacheListener;  
      8. import org.apache.curator.framework.recipes.cache.PathChildrenCache;  
      9. import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  
      10. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  
      11. import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;  
      12. import org.apache.curator.retry.ExponentialBackoffRetry;  
      13. import org.apache.zookeeper.CreateMode;  
      14. import org.apache.zookeeper.ZooDefs.Ids;  
      15.   
      16. /** 
      17.  * @see 测试curator框架例子 
      18.  * @Author:xuehan 
      19.  * @Date:2016年5月14日下午8:44:49 
      20.  */  
      21. public class CuratorUtils {  
      22.       
      23.     public String connectString = "localhost:2181";  
      24.     CuratorFramework  zkclient = null ;  
      25.     public CuratorUtils(){  
      26.         /** 
      27.          * connectString连接字符串中间用分号隔开,sessionTimeoutMs session过期时间,connectionTimeoutMs连接超时时间,retryPolicyc连接重试策略 
      28.          */  
      29.         //CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy)  
      30.         // fluent风格aip   
      31.   //    CuratorFrameworkFactory.builder().sessionTimeoutMs(5000).connectString(connectString).namespace("/test").build();  
      32.         // 重连策略,没1一秒重试一次,最大重试次数3次  
      33.         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
      34.         zkclient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("tests").build();  
      35.         zkclient.start();  
      36.     }  
      37.     /** 
      38.      * 递归创建节点 
      39.      * @param path 
      40.      * @param data 
      41.      * @throws Exception 
      42.      */  
      43.     public void createNode(String path, byte[] data) throws Exception{  
      44.         zkclient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path, data);  
      45.     }  
      46.     /** 
      47.      * 递归删除节点 
      48.      * @param path 
      49.      * @throws Exception 
      50.      */  
      51.     public void delNode(String path) throws Exception{  
      52.         zkclient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);  
      53.     }   public void zkClose(){  
      54.         zkclient.close();  
      55.     }  
      56.     public void delNodeCallBack(String path) throws Exception{  
      57.         zkclient.delete().guaranteed().deletingChildrenIfNeeded().inBackground(new DeleteCallBack()).forPath(path);  
      58.     }      
      59.     public void dataChanges(String path) throws Exception{  
      60.         final NodeCache  dataWatch =  new NodeCache(zkclient, path);  
      61.         dataWatch.start(true);  
      62.         dataWatch.getListenable().addListener(new NodeCacheListener(){  
      63.   
      64.             public void nodeChanged() throws Exception {  
      65.                 System.out.println("path==>" + dataWatch.getCurrentData().getPath() + "==data==>" + new String(dataWatch.getCurrentData().getData()));  
      66.             }  
      67.               
      68.         });  
      69.         zkclient.delete().guaranteed().deletingChildrenIfNeeded().inBackground(new DeleteCallBack()).forPath(path);  
      70.     }      
      71.     public void addChildWatcher(String path) throws Exception{  
      72.         final PathChildrenCache pc = new PathChildrenCache(zkclient, path, true);  
      73.         pc.start(StartMode.POST_INITIALIZED_EVENT);  
      74.         System.out.println("节点个数===>" + pc.getCurrentData().size());  
      75.         pc.getListenable().addListener(new  PathChildrenCacheListener() {  
      76.               
      77.             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
      78.                 System.out.println("事件监听到"  + event.getData().getPath());  
      79.                 if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){  
      80.                     System.out.println("客户端初始化节点完成"  + event.getData().getPath());  
      81.                 }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){  
      82.                     System.out.println("添加节点完成"  + event.getData().getPath());  
      83.                 }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){  
      84.                     System.out.println("删除节点完成"  + event.getData().getPath());  
      85.                 }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){  
      86.                     System.out.println("修改节点完成"  + event.getData().getPath());  
      87.                 }  
      88.             }  
      89.         });  
      90.           
      91.     }  
      92.       
      93.     public static void main(String[] args) throws Exception{  
      94.         CuratorUtils cu = new CuratorUtils();  
      95. //      cu.createNode("/test/sb/aa/bb", "erhu".getBytes());  
      96. //      cu.delNode("/test");  
      97.           
      98.         cu.zkclient.setData().forPath("/aa", "love is not".getBytes());  
      99.         cu.addChildWatcher("/aa");  
      100.         try{  
      101.             Thread.sleep(20000000);  
      102.         }catch(Exception e){};  
      103.     }  
      104.  }  
  • 相关阅读:
    C语言编译包含math库加参数-lm
    C语言浮点类型有效位(float, double,long double)
    C语言速记(宏)
    C语言速记6(结构体)
    asp.net Core依赖注入汇总
    跨域请求(转载)
    UnobtrusiveJavaScriptEnabled、ClientValidationEnabled(转载)
    到值类型“System.DateTime”的强制转换失败,因为具体化值为 null。结果类型的泛型参数或查询必须使用可以为 null 的类型。
    软件开发PPT中造图片软件:ProcessOn
    EF接收数据通用实体模型
  • 原文地址:https://www.cnblogs.com/a-du/p/9888972.html
Copyright © 2011-2022 走看看