zoukankan      html  css  js  c++  java
  • 7.5 zookeeper客户端curator的基本使用 + zkui

    使用zookeeper原生API实现一些复杂的东西比较麻烦。所以,出现了两款比较好的开源客户端,对zookeeper的原生API进行了包装:zkClient和curator。后者是Netflix出版的,必属精品,也是最好用的zk的开源客户端。

    一  curator基本API使用

    引入依赖:

    1         <dependency>
    2             <groupId>org.apache.curator</groupId>
    3             <artifactId>curator-framework</artifactId>
    4             <version>2.12.0</version>
    5         </dependency>

    该依赖引入后,默认引入的zookeeper版本是3.4.8。

    注意:不要引入>=3.0.0的curator-framework,默认引入的zookeeper版本是3.5.x(该版本还不稳定),目前测试起来还是有点问题的。

    完整代码:

     1 package com.hulk.curator;
     2 
     3 import org.apache.curator.framework.CuratorFramework;
     4 import org.apache.curator.framework.CuratorFrameworkFactory;
     5 import org.apache.curator.framework.api.BackgroundCallback;
     6 import org.apache.curator.framework.api.CuratorEvent;
     7 import org.apache.curator.retry.ExponentialBackoffRetry;
     8 import org.apache.zookeeper.CreateMode;
     9 import org.apache.zookeeper.data.Stat;
    10 
    11 import java.util.concurrent.Executors;
    12 
    13 public class CuratorTest {
    14     private static CuratorFramework client = CuratorFrameworkFactory.builder()
    15             .connectString("10.211.55.4:2181")
    16             .sessionTimeoutMs(50000)
    17             .connectionTimeoutMs(30000)
    18             .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    19 
    20     public static void main(String[] args) throws Exception {
    21         /**
    22          * 创建会话
    23          */
    24         client.start();
    25 
    26         /**
    27          * 创建节点
    28          * 注意:
    29          * 1 除非指明创建节点的类型,默认是持久节点
    30          * 2 ZooKeeper规定:所有非叶子节点都是持久节点,所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
    31          */
    32         client.create().forPath("/China");//创建一个初始内容为空的节点
    33         client.create().forPath("/America", "zhangsan".getBytes());
    34         client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//创建一个初始内容为空的临时节点
    35         client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//递归创建,/Russia是持久节点
    36 
    37         /**
    38          * 异步创建节点
    39          * 注意:如果自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定,那么就会使用Zookeeper的EventThread线程对事件进行串行处理
    40          */
    41         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
    42             @Override
    43             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
    44                 System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
    45                                    + ",type:" + event.getType());
    46             }
    47         }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my");
    48 
    49         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
    50             @Override
    51             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
    52                 System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
    53                                    + ",type:" + event.getType());
    54             }
    55         }).forPath("/async-curator-zookeeper");
    56 
    57         /**
    58          * 获取节点内容
    59          */
    60         byte[] data = client.getData().forPath("/America");
    61         System.out.println(new String(data));
    62         byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //传入一个旧的stat变量,来存储服务端返回的最新的节点状态信息
    63         System.out.println(new String(data2));
    64         /**
    65          * 更新数据
    66          */
    67         Stat stat = client.setData().forPath("/America");
    68         client.setData().withVersion(4).forPath("/America", "lisi".getBytes());
    69 
    70         /**
    71          * 删除节点
    72          */
    73         client.delete().forPath("/China");//只能删除叶子节点
    74         client.delete().deletingChildrenIfNeeded().forPath("/Russia");//删除一个节点,并递归删除其所有子节点
    75         client.delete().withVersion(5).forPath("/America");//强制指定版本进行删除
    76         client.delete().guaranteed().forPath("/America");//注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed(),如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止
    77 
    78         Thread.sleep(Integer.MAX_VALUE);
    79     }
    80 }

    1  创建会话

    curator创建会话有两种方式,推荐流式API。

    1 CuratorFramework client = CuratorFrameworkFactory.builder()
    2             .connectString("10.211.55.4:2181")
    3             .sessionTimeoutMs(50000)
    4             .connectionTimeoutMs(30000)
    5             .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    参数:

    • connectString:zk的server地址,多个server之间使用英文逗号分隔开
    • connectionTimeoutMs:连接超时时间,如上是30s,默认是15s
    • sessionTimeoutMs:会话超时时间,如上是50s,默认是60s
    • retryPolicy:失败重试策略
      • ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
        • baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,
          • 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
        • maxRetries:最大重试次数
        • maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间
      • 其他,查看org.apache.curator.RetryPolicy接口的实现类

    此时会话还没创建,使用如下代码创建会话:

    1 client.start();

    start()会阻塞到会话创建成功为止。

    2  创建节点

    2.1  同步创建

    1         client.create().forPath("/China");//创建一个初始内容为空的节点
    2         client.create().forPath("/America", "zhangsan".getBytes());
    3         client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//创建一个初始内容为空的临时节点
    4         client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//递归创建,/Russia是持久节点

    注意:

    • 除非指明创建节点的类型,默认是持久节点
    • ZooKeeper规定:所有非叶子节点都是持久节点,所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
    • creatingParentsIfNeeded():可以实现递归创建

    2.2  异步创建

     1         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
     2             @Override
     3             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
     4                 System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
     5                                    + ",type:" + event.getType());
     6             }
     7         }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my");
     8 
     9         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
    10             @Override
    11             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
    12                 System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
    13                                    + ",type:" + event.getType());
    14             }
    15         }).forPath("/async-curator-zookeeper");

    注意:

    • 在curator中所有异步操作,都使用org.apache.curator.framework.api.BackgroundCallback接口的实现类完成
    • 如果在BackgroundCallback中自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定,那么就会使用Zookeeper的EventThread线程对事件进行串行处理,所以上述的两个输出分别如下:
      当前线程:pool-3-thread-1,code:0,type:CREATE
      当前线程:main-EventThread,code:0,type:CREATE

    3  获取节点内容

    1         byte[] data = client.getData().forPath("/America");
    2         System.out.println(new String(data));
    3         byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //传入一个旧的stat变量,来存储服务端返回的最新的节点状态信息
    4         System.out.println(new String(data2));

    4  获取节点子节点列表

    1 List<String> children = client.getChildren().forPath("/Russia");

    5  更新数据

    1         Stat stat = client.setData().forPath("/America");
    2         client.setData().withVersion(4).forPath("/America", "lisi".getBytes());

    注意:

    • version版本号还是为了实现CAS并发处理,也会强制某个线程必须更新相应的版本的数据

    6  删除节点

    1         client.delete().forPath("/China");//只能删除叶子节点
    2         client.delete().deletingChildrenIfNeeded().forPath("/Russia");//删除一个节点,并递归删除其所有子节点
    3         client.delete().withVersion(5).forPath("/America");//强制指定版本进行删除
    4         client.delete().guaranteed().forPath("/America");

    注意:

    • deletingChildrenIfNeeded()实现级联删除
    • guaranteed()由于一些网络原因,上述的删除操作有可能失败,使用guaranteed(),如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止

    二  curator实现事件监听

    引入两个依赖:

     1         <dependency>
     2             <groupId>org.apache.curator</groupId>
     3             <artifactId>curator-framework</artifactId>
     4             <version>2.12.0</version>
     5         </dependency>
     6         <dependency>
     7             <groupId>org.apache.curator</groupId>
     8             <artifactId>curator-recipes</artifactId>
     9             <version>2.12.0</version>
    10         </dependency>

    给出全部代码:

     1 package com.hulk.curator;
     2 
     3 import org.apache.curator.framework.CuratorFramework;
     4 import org.apache.curator.framework.CuratorFrameworkFactory;
     5 import org.apache.curator.framework.recipes.cache.NodeCache;
     6 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
     7 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
     8 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
     9 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    10 import org.apache.curator.retry.ExponentialBackoffRetry;
    11 
    12 /**
    13  * 事件监听器
    14  */
    15 public class CuratorWatcherTest {
    16     private static CuratorFramework client = CuratorFrameworkFactory.builder()
    17             .connectString("10.211.55.4:2181")
    18             .sessionTimeoutMs(50000)
    19             .connectionTimeoutMs(30000)
    20             .retryPolicy(new ExponentialBackoffRetry(1000, 3))
    21             .build();
    22 
    23     public static void main(String[] args) throws Exception {
    24         /**
    25          * 创建会话
    26          */
    27         client.start();
    28         client.create().creatingParentsIfNeeded().forPath("/book/computer","java".getBytes());
    29         /**
    30          * 监听指定节点本身的变化,包括节点本身的创建和节点本身数据的变化
    31          */
    32         NodeCache nodeCache = new NodeCache(client,"/book/computer");
    33         nodeCache.getListenable().addListener(new NodeCacheListener() {
    34             @Override
    35             public void nodeChanged() throws Exception {
    36                 System.out.println("新的节点数据:" + new String(nodeCache.getCurrentData().getData()));
    37             }
    38         });
    39         nodeCache.start(true);
    40 
    41         client.setData().forPath("/book/computer","c++".getBytes());
    42         /**
    43          * 监听子节点变化情况
    44          * 1 新增子节点
    45          * 2 删除子节点
    46          * 3 子节点数据变更
    47          */
    48         PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/book13",true);
    49         pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    50             @Override
    51             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
    52                 switch (event.getType()){
    53                     case CHILD_ADDED:
    54                         System.out.println("新增子节点:" + event.getData().getPath());
    55                         break;
    56                     case CHILD_UPDATED:
    57                         System.out.println("子节点数据变化:" + event.getData().getPath());
    58                         break;
    59                     case CHILD_REMOVED:
    60                         System.out.println("删除子节点:" + event.getData().getPath());
    61                         break;
    62                     default:
    63                         break;
    64                 }
    65             }
    66         });
    67         pathChildrenCache.start();
    68 
    69         client.create().forPath("/book13");
    70 
    71         client.create().forPath("/book13/car", "bmw".getBytes());
    72 
    73         client.setData().forPath("/book13/car", "audi".getBytes());
    74 
    75         client.delete().forPath("/book13/car");
    76     }
    77 }

    curator的事件监听分为:

    • NodeCache:对节点本身的监听
      • 监听节点本身的创建
      • 监听节点本身的数据的变化
    • PathChildrenCache:对节点的子节点的监听
      • 监听新增子节点
      • 监听删除子节点
      • 监听子节点数据变化

    注意

    • PathChildrenCache只会监听指定节点的一级子节点,不会监听节点本身(例如:“/book13”),也不会监听子节点的子节点(例如,“/book13/car/color”)

    三  zkui

    zk的操作我们一般可以登上zk所在的机器,然后执行“sh zkCli.sh”,之后执行一些命令,但是由于这样始终效率低下,这里推荐一款比较好用的zk的ui界面:zkui。

    假设我们要在10.211.55.5机器上安装该程序。

    1  下载打包

    1 git clone https://github.com/DeemOpen/zkui.git
    2 cd zkui/
    3 mvn clean install

    通过上述的操作,在zkui/target目录下我们会生成一个fatjar:zkui-2.0-SNAPSHOT-jar-with-dependencies.jar,在启动这个jar之前先要进行相关配置。

    2  配置zkui

    1 cp config.cfg target/
    2 vi config.cfg
    3 修改内容如下,其他不变:
    4 zkServer=10.211.55.5:2181

    注意:需要将配置文件config.cfg与fatjar放在同一个目录下。

    3  启动zkui

    之后进入target/目录下,执行:

    1 nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &

    4  浏览器访问

    浏览器访问“http://10.211.55.5:9090”,之后在登录页面输入用户名密码:admin/manager进行登录。(可以去config.cfg进行配置)

  • 相关阅读:
    Android——点击对话框上按钮不关闭对话框
    超酷的Android 侧滑(双向滑动菜单)效果
    Android快速开发不可或缺的11个工具类(下载)
    Android ——真机调试
    Android程序完全退出的三种方法
    android 添加桌面快捷方式
    最全Android开发常用工具类
    成为Java GC专家(5)—Java性能调优原则
    JVM调优总结 + jstat 分析
    mysql中max_allowed_packet参数的配置方法(避免大数据写入或者更新失败)
  • 原文地址:https://www.cnblogs.com/java-zhao/p/7350945.html
Copyright © 2011-2022 走看看