zoukankan      html  css  js  c++  java
  • Apache curator-client详解

     Apache curator框架中curator-client组件可以作为zookeeper client来使用,它提供了zk实例创建/重连机制等,简单便捷.不过直接使用curator-client并不能减少太多的开发量,因为它相对比较底层,稍后我们继续了解curator-framework组件提供的更多的便捷特性.

    一.核心API

        1. CuratorZookeeperClient: zookeeper客户端,根据指定的配置信息创建zookeeper实例.

        2. RetryPolicy接口: 重连策略,当zookeeper失去链接时使用的"重连策略": 

            <> RetryOneTime: 只重连一次.

            <> RetryNTime: 指定重连的次数N.

            <> RetryUtilElapsed: 指定最大重连超时时间和重连时间间隔,间歇性重连直到超时或者链接成功.

            <> ExponentialBackoffRetry: 基于"backoff"方式重连,和RetryUtilElapsed的区别是重连的时间间隔是动态的.

    Java代码  收藏代码
    1. 时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))).  

            <> BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry(最大重试次数的控制),增加了最大睡眠时间.

        3. RetryLoop: 操作重试,如果在执行一个操作时,遇到了zk链接异常,怎么办?RetryLoop可以不断重试,直到网络正常且操作执行成功为止.SessionFailRetryLoop类是一个特列,可以兼容当session失效时,如何进行操作重试.

        4. EnsembleProvider: 配置提供者,创建zk客户端时,需要指定connectionString(例如:127.0.0.1:2181),在zookeeper API中只能显式的指定,curator在这个方面提供了更加灵活性的方式,你可以通过任何方式获取或者构建connectionString.

            <> FixedEnsembleProvider: 使用固定的字符串作为connectionString.

            <> ExhibitorEnsembleProvider: 动态的获取connectionString,可以指定一个URL用来提供connectionString的输出服务.此后此Provider将会间歇性的获取最新的connectionString字符串,并保存.事实上,ExhibitorEnsembleProvider只是一个样例,展示了一种动态获取connectionString的方式,如果在真正的开发中,你可能需要参考它,来定制自己的Provider.

    二. 通用客户端代码示例

    Java代码  收藏代码
    1. public class ZooKeeperClient extends Thread{  
    2.   
    3.     protected final CuratorZookeeperClient zkClient;  
    4.     protected String parent;  
    5.   
    6.     public static final Charset charset = Charset.forName("utf-8");  
    7.   
    8.     private ZNodeWatcher zNodeWatcher = new ZNodeWatcher();//自定义watcher  
    9.   
    10.     public ZooKeeperClient(String connectString, int sessionTimeout, String parent) throws Exception {  
    11.         this.parent = parent;  
    12.         zkClient = new CuratorZookeeperClient(connectString, sessionTimeout, sessionTimeout, zNodeWatcher, new ExponentialBackoffRetry(1000, Integer.MAX_VALUE));  
    13.         zkClient.start();//must,but anytime before zookeeper operation  
    14.         zkClient.blockUntilConnectedOrTimedOut(); //first connection should be successful  
    15.     }  
    16.   
    17.   
    18.     public boolean exist(String path,boolean watched) throws Exception{  
    19.         return zkClient.getZooKeeper().exists(path,watched) == null ? false : true;  
    20.     }  
    21.   
    22.     /** 
    23.      * 此path必须存在,如果不存在则立即创建 
    24.      * @param path 
    25.      * @return 
    26.      */  
    27.     public boolean ensurePath(final String path) throws Exception{  
    28.         PathUtils.validatePath(path);  
    29.         return RetryLoop.callWithRetry(zkClient, new Callable<Boolean>(){  
    30.             @Override  
    31.             public Boolean call() throws Exception {  
    32.                 EnsurePath ensure = new EnsurePath(path);  
    33.                 ensure.ensure(zkClient);  
    34.                 return true;  
    35.             }  
    36.         });  
    37.     }  
    38.   
    39.     /** 
    40.      * 
    41.      * @param path 
    42.      * @param data 
    43.      * @return   如果path已经存在或者创建成功,则返回true,否则返回false。 
    44.      * @throws Exception 
    45.      */  
    46.     public boolean create(final String path, final String data) throws Exception {  
    47.         PathUtils.validatePath(path);//if bad format,here will throw some Exception;  
    48.         return RetryLoop.callWithRetry(zkClient, new Callable<Boolean>() {  
    49.   
    50.             @Override  
    51.             public Boolean call() throws Exception {  
    52.                 int _current = 0;  
    53.                 while (_current < 3) {  
    54.                     _current++;  
    55.                     try {  
    56.                         //zkClient.blockUntilConnectedOrTimedOut();  
    57.                         //确保父节点存在  
    58.                         EnsurePath ensure = new EnsurePath(path).excludingLast();  
    59.                         //parent path should be existed.  
    60.                         //EnsurePath: retry + block  
    61.                         ensure.ensure(zkClient); //ugly API  
    62.                         zkClient.getZooKeeper().create(path, data.getBytes(charset), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    63.                         return true;  
    64.                     } catch (KeeperException.NodeExistsException e) {  
    65.                         return true;  
    66.                     }  
    67.                     //retry only for KeeperException,not for other runtimeException。  
    68.                     //other exception will be thrown,and stop retry!!  
    69.                     //if no Exception thrown,retry will be stopped and return successfully.  
    70.                 }  
    71.                 return false;  
    72.             }  
    73.         }) ;  
    74.     }  
    75.       
    76.       
    77.     public  class ZNodeWatcher implements Watcher{  
    78.         @Override  
    79.         public void process(WatchedEvent event) {  
    80.             Event.EventType eventType = event.getType();  
    81.             Event.KeeperState keeperState =  event.getState();  
    82.             String path = event.getPath();  
    83.             switch(event.getType()) {  
    84.                 case None:  
    85.                     //connection Error:会自动重连  
    86.                     logger.info("[Watcher],Connecting...");  
    87.                     if(keeperState == Event.KeeperState.SyncConnected){  
    88.                         logger.info("[Watcher],Connected...");  
    89.                         //检测临时节点是否失效等。  
    90.                     }  
    91.                     break;  
    92.                 case NodeCreated:  
    93.                     logger.info("[Watcher],NodeCreated:" + path);  
    94.                     break;  
    95.                 case NodeDeleted:  
    96.                     logger.info("[Watcher],NodeDeleted:" + path);  
    97.                     break;  
    98.                 default:  
    99.                     //  
    100.             }  
    101.         }  
    102.     }  
    103. }  

    三. Provider代码实例

        本实例展示了如何使用curator-client开发简单的API,展示了RetryPolicy,RetryLoop的使用方式;实例中使用Curator自带的ExhibitorEnsembleProvider动态获取zookeeper服务器列表信息.

        1. pom.xml

    Xml代码  收藏代码
    1. <dependency>  
    2.     <groupId>org.apache.zookeeper</groupId>  
    3.     <artifactId>zookeeper</artifactId>  
    4.     <version>3.4.5</version>  
    5.       
    6. </dependency>  
    7. <dependency>  
    8.     <groupId>org.apache.curator</groupId>  
    9.     <artifactId>curator-recipes</artifactId>  
    10.     <version>2.3.0</version>  
    11. </dependency>  

        2. curator-config.propeties 

    Java代码  收藏代码
    1. host.rest.servers=127.0.0.1,localhost  
    2. host.rest.port=8080  
    3. host.backup=127.0.0.1:2181  
    4. host.rest.path=/servers/zk  
    5. host.rest.period=180000  

        3. IZkClient.java  

    Java代码  收藏代码
    1. package com.test.demo.curator;  
    2.   
    3. import java.util.*;  
    4. import java.util.concurrent.Callable;  
    5. import java.util.concurrent.TimeUnit;  
    6.   
    7. import org.apache.curator.CuratorZookeeperClient;  
    8. import org.apache.curator.RetryLoop;  
    9. import org.apache.curator.RetryPolicy;  
    10. import org.apache.curator.TimeTrace;  
    11. import org.apache.curator.drivers.TracerDriver;  
    12. import org.apache.curator.ensemble.EnsembleProvider;  
    13. import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;  
    14. import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;  
    15. import org.apache.curator.ensemble.exhibitor.ExhibitorRestClient;  
    16. import org.apache.curator.ensemble.exhibitor.Exhibitors;  
    17. import org.apache.curator.retry.ExponentialBackoffRetry;  
    18. import org.apache.curator.retry.RetryNTimes;  
    19. import org.apache.curator.utils.EnsurePath;  
    20. import org.apache.curator.utils.PathUtils;  
    21. import org.apache.curator.utils.ZKPaths;  
    22. import org.apache.zookeeper.CreateMode;  
    23. import org.apache.zookeeper.KeeperException;  
    24. import org.apache.zookeeper.Transaction;  
    25. import org.apache.zookeeper.ZooDefs;  
    26. import org.apache.zookeeper.data.ACL;  
    27.   
    28.   
    29. public class IZkClient {  
    30.   
    31.     private final CuratorZookeeperClient zkClient;  
    32.   
    33.     public IZkClient(String configLocation) throws Exception {  
    34.   
    35.         Properties properties = new Properties();  
    36.         properties.load(ClassLoader.getSystemResourceAsStream(configLocation));  
    37.         EnsembleProvider provider = buildProvider(properties);  
    38.         String pTimeout = properties.getProperty("zk.timeout");  
    39.         Integer timeout = 30000;  
    40.         if (pTimeout != null) {  
    41.             timeout = Integer.valueOf(pTimeout);  
    42.         }  
    43.         zkClient = new CuratorZookeeperClient(provider, timeout, timeout, null, new ExponentialBackoffRetry(1000, Integer.MAX_VALUE));  
    44.         zkClient.setTracerDriver(new PrintTraceDrive());  
    45.         zkClient.start();//must,but anytime before zookeeper operation  
    46.         zkClient.blockUntilConnectedOrTimedOut(); //first connection should be successful  
    47.     }  
    48.   
    49.     /** 
    50.      * build provider,all of params from config-file 
    51.      * @param properties 
    52.      * @return 
    53.      */  
    54.     private EnsembleProvider buildProvider(Properties properties) {  
    55.         String servers = properties.getProperty("host.rest.servers");   //hosts.servers = 127.0.0.1,127.0.0.2  
    56.         if (servers == null || servers.isEmpty()) {  
    57.             throw new IllegalArgumentException("host.servers cant be empty");  
    58.         }  
    59.         List<String> hostnames = Arrays.asList(servers.split(","));  
    60.         String port = properties.getProperty("host.rest.port");  
    61.         Integer restPort = 80;   //default  
    62.         if (port != null) {  
    63.             restPort = Integer.valueOf(port);  
    64.         }  
    65.         final String backupAddress = properties.getProperty("host.backup");//127.0.0.1:2181  
    66.         //if network is error,you should sepcify a backup zk-connectString  
    67.         Exhibitors exhibitors = new Exhibitors(hostnames, restPort, new Exhibitors.BackupConnectionStringProvider() {  
    68.             @Override  
    69.             public String getBackupConnectionString() throws Exception {  
    70.                 return backupAddress;  
    71.             }  
    72.         });  
    73.         //rest,as meaning of getting fresh zk-connectString list.  
    74.         ExhibitorRestClient restClient = new DefaultExhibitorRestClient();  
    75.         String restUriPath = properties.getProperty("host.rest.path");  
    76.         String period = properties.getProperty("host.rest.period");  
    77.         Integer pollingMs = 180000; //3 min  
    78.         if (period != null) {  
    79.             pollingMs = Integer.valueOf(period);  
    80.         }  
    81.         return new ExhibitorEnsembleProvider(exhibitors, restClient, restUriPath, pollingMs, new RetryNTimes(10, 1000));  
    82.     }  
    83.   
    84.     public CuratorZookeeperClient getZkClient() {  
    85.         return zkClient;  
    86.     }  
    87.   
    88.     /** 
    89.      * how to use RtryLoop ,another style 
    90.      * if Znode has been existed,will delete it,and create it again. 
    91.      * 
    92.      */  
    93.     public boolean replace(final String path,final byte[] value){  
    94.         PathUtils.validatePath(path);  
    95.         boolean result = false;  
    96.         try{  
    97.             result = RetryLoop.callWithRetry(zkClient,new Callable<Boolean>() {  
    98.   
    99.                 @Override  
    100.                 public Boolean call() throws Exception {  
    101.                     int _current = 0;  
    102.                     while(_current < 3){  
    103.                         _current++;  
    104.                         try{  
    105.                             zkClient.blockUntilConnectedOrTimedOut();  
    106.                             Transaction tx = zkClient.getZooKeeper().transaction();  
    107.                             tx.delete(path, -1);  
    108.                             tx.create(path,value,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    109.                             tx.commit();  
    110.                             return true;  
    111.                         } catch (KeeperException.NoNodeException e){  
    112.                              //  
    113.                         } catch (KeeperException.NodeExistsException e){  
    114.                             //  
    115.                         }  
    116.                     }  
    117.                     return false;  //To change body of implemented methods use File | Settings | File Templates.  
    118.                 }  
    119.             }) ;  
    120.         }catch (Exception e){  
    121.             e.printStackTrace();  
    122.         }  
    123.         return result;  
    124.     }  
    125.   
    126.     //API : on for test  
    127.     public String createPath(String path, byte[] value) throws Exception {  
    128.         PathUtils.validatePath(path);//if bad format,here will throw some Exception;  
    129.         EnsurePath ensure = new EnsurePath(path).excludingLast();  
    130.         //parent path should be existed.  
    131.         //EnsurePath: retry + block  
    132.         ensure.ensure(zkClient); //ugly API  
    133.         return zkClient.getZooKeeper().create(path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    134.     }  
    135.   
    136.     //API: on for test  
    137.     public boolean createPath(String path, byte[] value,int blockTimes){  
    138.         if (!zkClient.isConnected() && blockTimes == 0) {  
    139.             return false;  
    140.         }  
    141.         TimeTrace trace = zkClient.startTracer("createPath:" + path);//log message  
    142.         try{  
    143.             EnsurePath ensure = new EnsurePath(path).excludingLast();  
    144.             ensure.ensure(zkClient);//only for persistent node  
    145.             RetryLoop loop = zkClient.newRetryLoop();  
    146.             int _current = 0;  
    147.             while(loop.shouldContinue()){  
    148.                  try{  
    149.                      if(_current >= blockTimes){  
    150.                          loop.markComplete(); //stop here.  
    151.                          continue;  
    152.                      }  
    153.                      //blocking  
    154.                      boolean isConnected = zkClient.blockUntilConnectedOrTimedOut();  
    155.                      if(!isConnected){  
    156.                          _current++;  
    157.                          continue;  
    158.                      }  
    159.                      zkClient.getZooKeeper().create(path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
    160.                      loop.markComplete();  
    161.                  } catch (KeeperException.NodeExistsException e){  
    162.                      loop.markComplete();//exist ,stop here  
    163.                  } catch (Exception e){  
    164.                      loop.takeException(e);  
    165.                  }  
    166.             }  
    167.         } catch (Exception e){  
    168.             e.printStackTrace();  
    169.             return false;  //cant create path  
    170.         } finally{  
    171.             trace.commit();  
    172.         }  
    173.         return true;  
    174.     }  
    175.   
    176.     public byte[] getData(String path) throws Exception{  
    177.         PathUtils.validatePath(path);  
    178.         return zkClient.getZooKeeper().getData(path,false,null);  
    179.     }  
    180.     public void close(){  
    181.         zkClient.close();  
    182.     }  
    183.     class PrintTraceDrive implements TracerDriver {  
    184.         @Override  
    185.         public void addTrace(String name, long time, TimeUnit unit) {  
    186.             System.out.println("<Trace>" + name + ";time:" + TimeUnit.MILLISECONDS.convert(time, unit) + " ms");  
    187.         }  
    188.   
    189.         @Override  
    190.         public void addCount(String name, int increment) {  
    191.         }  
    192.     }  
    193. }  

        4. IZkClientMain.java(for testing) 

    Java代码  收藏代码
    1. public class IZkClientMain {  
    2.   
    3.   
    4.     public static void main(String[] args) throws Exception {  
    5.         String configLocation = "curator-config.properties";  
    6.         IZkClient iZkClient = new IZkClient(configLocation);  
    7.         String value = "curator-demo";  
    8.         String path = "/curator/child/0";  
    9.   
    10.         iZkClient.replace(path, value.getBytes("utf-8"));  
    11.         //simple method;  
    12.         String nodeName = ZKPaths.getNodeFromPath(path);  
    13.         System.out.print(nodeName);  
    14.         //value  
    15.         byte[] bytes = iZkClient.getData(path);  
    16.         System.out.println(new String(bytes, "utf-8"));  
    17.         Thread.sleep(180000 * 2);  
    18.         iZkClient.close();  
    19.     }  
    20. }  

        5. ExhibitorEnsembleProvider需要使用远端的一个REST风格的Url来提供zookeeper服务器列表,如下为Spring方式:

    Java代码  收藏代码
    1. @Controller  
    2. @RequestMapping("/servers")  
    3. public class ServersController {  
    4.   
    5.     @RequestMapping(value = "/zk",headers="Accept=application/x-www-form-urlencoded")  
    6.     public void zk(HttpServletResponse response) throws  Exception{  
    7.         FormHttpMessageConverter converter = new FormHttpMessageConverter();  
    8.         converter.setCharset(Charset.forName("utf-8"));  
    9.         HttpOutputMessage output = new ServletServerHttpResponse(response);  
    10.         converter.write(buildServers(), MediaType.APPLICATION_FORM_URLENCODED,output);  
    11.         //String servers = "count=2&port=2181&server0=127.0.0.1&server1=localhost";  
    12.     }  
    13.   
    14.   
    15.     private MultiValueMap<String,Object> buildServers(){  
    16.         MultiValueMap<String,Object> map = new LinkedMultiValueMap<String, Object>();  
    17.         map.add("count","2");  
    18.         map.add("port","2181");  
    19.         map.add("server0","127.0.0.1");  
    20.         map.add("server1","localhost");  
    21.         return map;  
    22.     }  
    23.   
    24. }  

      

       备注:Curator-client可以帮助我们进行链接重连操作,重连过程中,我们不需要关注太多,不过你仍然可以通过注册Watcher的手段来活的通知.如果在操作过程中,zk的链接有异常,你可以通过RetryLoop的方式实现阻塞. 

  • 相关阅读:
    声明对象指针,调用构造、析构函数的多种情况
    [C++ STL] 常用算法总结
    [C++ STL] map使用详解
    [C++ STL] set使用详解
    [C++ STL] list使用详解
    [C++ STL] deque使用详解
    Servlet课程0424(一) 通过实现Servlet接口来开发Servlet
    CSS盒子模型
    Spring学习之第一个hello world程序
    Java基础面试题
  • 原文地址:https://www.cnblogs.com/xingzc/p/6165564.html
Copyright © 2011-2022 走看看