zoukankan      html  css  js  c++  java
  • 10、zookeeper客户端curator

    curator介绍

    https://blog.csdn.net/wo541075754/article/details/68067872 关于第三方客户端的小介绍

    zkClient有对dubbo的一些操作支持,但是zkClient几乎没有文档,下面是curator

    curator简介

    curatorNetflix公司开源的一个 zookeeper客户端,后捐献给 apache,,curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等的抽象封装,实现了Fluent风格的APl接口,是最好用,最流行的zookeeper的客户端

    原生zookeeperAPI的不足

    • 连接对象异步创建,需要开发人员自行编码等待

    • 连接没有自动重连超时机制

    • watcher一次注册生效一次

    • 不支持递归创建树形节点

    curator特点

    • 解决session会话超时重连

    • watcher反复注册

    • 简化开发api

    • 遵循Fluent风格API

        <!-- Zookeeper -->
       <dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
           <version>3.4.10</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-framework</artifactId>
           <version>2.6.0</version>
           <exclustions>
               <exclustion>
                  <groupId>org.apache.zookeeper</groupId>
                  <artifactId>zookeeper</artifactId>
               </exclustion>
           </exclustions>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-recipes</artifactId>
           <version>2.6.0</version>
       </dependency>

     

    基础用法

        public static void main(String[] args) {
           // 工厂创建,fluent风格
           CuratorFramework client = CuratorFrameworkFactory.builder()
                   // ip端口号
                  .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183")
                   // 会话超时
                  .sessionTimeoutMs(5000)
                   // 重试机制,这里是超时后1000毫秒重试一次
                  .retryPolicy(new RetryOneTime(1000))
                   // 名称空间,在操作节点的时候,会以这个为父节点
                  .namespace("create")
                  .build();
           client.start();
           
           System.out.println(client.getState());
           client.close();

      }
    • session重连策略

      • RetryPolicy retry Policy = new RetryOneTime(3000);

        • 说明:三秒后重连一次,只重连一次

      • RetryPolicy retryPolicy = new RetryNTimes(3,3000);

        • 说明:每三秒重连一次,重连三次

      • RetryPolicy retryPolicy = new RetryUntilElapsed(1000,3000);

        • 说明:每三秒重连一次,总等待时间超过个10秒后停止重连

      • RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3)

        • 说明:这个策略的重试间隔会越来越长

          • 公式:baseSleepTImeMs * Math.max(1,random.nextInt(1 << (retryCount + 1)))

            • baseSleepTimeMs = 1000 例子中的值

            • maxRetries = 3 例子中的值

    创建

    public class curatorGettingStart {
       public static CuratorFramework client;

       // ids权限
       public static void create1() throws Exception {
           // 新增节点
           client.create()
                   // 节点的类型
                  .withMode(CreateMode.EPHEMERAL)
                   // 节点的acl权限列表
                  .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                   // arg1:节点路径,arg2:节点数据
                  .forPath("/node1",new byte[0]);
      }
       // 自定义权限
       public static void create2() throws Exception {
           ArrayList<ACL> acls = new ArrayList<>();
           Id id = new Id("world", "anyone");
           acls.add(new ACL(ZooDefs.Perms.READ,id));
           // 新增节点
           client.create()
                   // 节点的类型
                  .withMode(CreateMode.EPHEMERAL)
                   // 节点的acl权限列表
                  .withACL(acls)
                   // arg1:节点路径,arg2:节点数据
                  .forPath("/node2",new byte[0]);
      }
       // 递归创建
       public static void create3() throws Exception {
           // 新增节点
           client.create()
                   // 递归创建
                  .creatingParentsIfNeeded()
                   // 节点的类型
                  .withMode(CreateMode.EPHEMERAL)
                   // 节点的acl权限列表
                  .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                   // arg1:节点路径,arg2:节点数据
                  .forPath("/node2/nodex",new byte[0]);
      }
       // 递归创建
       public static void create4() throws Exception {
           // 新增节点
           System.out.println(1);
           client.create()

                  .creatingParentsIfNeeded()
                   // 节点的类型
                  .withMode(CreateMode.EPHEMERAL)
                   // 节点的acl权限列表
                  .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                   // 异步
                  .inBackground(new BackgroundCallback() {
                       @Override
                       public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                           System.out.println("异步创建成功");
                      }
                  })
                   // arg1:节点路径,arg2:节点数据
                  .forPath("/node2/nodex",new byte[0]);
           System.out.println(2);
      }
       public static void main(String[] args) throws Exception {
           // 工厂创建,fluent风格
           CuratorFramework client = CuratorFrameworkFactory.builder()
                   // ip端口号
                  .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183")
                   // 会话超时
                  .sessionTimeoutMs(5000)
                   // 重试机制,这里是超时后1000毫秒重试一次
                  .retryPolicy(new RetryOneTime(1000))
                   // 名称空间,在操作节点的时候,会以这个为父节点
                  .namespace("create")
                  .build();
           client.start();
    //       create1();
    //       create2();
    //       create3();
           create4();

           System.out.println(client.getState() + "操作完成");
           TimeUnit.SECONDS.sleep(20);
           client.close();
      }
    }

    修改

    public class curatorGettingStart {
       public static CuratorFramework client;

       public static void set1() throws Exception {
           // 修改节点
           client.setData()
                   // 版本
                  .withVersion(-1)
                  .forPath("/hadoop","hadoop1".getBytes());
      }
       public static void set2() throws Exception {
           // 修改节点
           client.setData()
                  .withVersion(1)
                  .forPath("/hadoop","hadoop2".getBytes());
      }
       public static void set3() throws Exception {
           // 修改节点
           client.setData()
                  .withVersion(1)
                   // 异步
                  .inBackground(new BackgroundCallback() {
                       @Override
                       public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                           if(curatorEvent.getType() == CuratorEventType.SET_DATA)
                               System.out.println(curatorEvent.getPath()+ "   " +curatorEvent.getType());
                      }
                  })
                  .forPath("/hadoop","hadoop3".getBytes());

      }
       public static void main(String[] args) throws Exception {
           // 工厂创建,fluent风格
           client = CuratorFrameworkFactory.builder()
                   // ip端口号
                  .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183")
                   // 会话超时
                  .sessionTimeoutMs(5000)
                   // 重试机制,这里是超时后1000毫秒重试一次
                  .retryPolicy(new RetryOneTime(1000))
                   // 名称空间,在操作节点的时候,会以这个为父节点,可选操作
                  .namespace("update")
                  .build();
           client.start();
    //       set1();
           set2();
    //       set3();
           System.out.println(client.getState() + "操作完成");
           TimeUnit.SECONDS.sleep(20);
           client.close();
      }
    }

    删除

    public class curatorGettingStart {
       public static CuratorFramework client;
       public static void delete1() throws Exception {
           // 删除节点
           client.delete()
                  .forPath("node1");
      }

       public static void delete2() throws Exception {
           // 删除节点
           client.delete()
                   // 版本
                  .withVersion(1)
                  .forPath("node2");
      }

       public static void delete3() throws Exception {
           // 删除节点
           client.delete()
                   // 递归删除
                  .deletingChildrenIfNeeded()
                  .withVersion(-1)
                  .forPath("node3");
      }

       public static void delete4() throws Exception {
           // 删除节点
           client.delete()
                  .withVersion(-1)
                   // 异步
                  .inBackground(new BackgroundCallback() {
                       @Override
                       public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                           if (curatorEvent.getType() == CuratorEventType.DELETE)
                               System.out.println(curatorEvent.getPath() + "   " + curatorEvent.getType());
                      }
                  })
                  .forPath("node3");

      }


       public static void main(String[] args) throws Exception {
           // 工厂创建,fluent风格
           client = CuratorFrameworkFactory.builder()
                   // ip端口号
                  .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183")
                   // 会话超时
                  .sessionTimeoutMs(5000)
                   // 重试机制,这里是超时后1000毫秒重试一次
                  .retryPolicy(new RetryOneTime(1000))
                   // 名称空间,在操作节点的时候,会以这个为父节点,可选操作
                  .namespace("delete")
                  .build();
           client.start();
           //       delete1();
           //       delete2();
           //       delete3();
           // delete4();
           System.out.println(client.getState() + "操作完成");
           TimeUnit.SECONDS.sleep(20);
           client.close();
      }
    }

    读取节点

    public class curatorGettingStart {
       public static CuratorFramework client;
       public static void get1() throws  Exception {
           // 获取数据
           byte[] bytes = client.getData()
                  .forPath("/node");
           System.out.println(new String((bytes)));
      }
       public static void get2() throws  Exception {
           Stat stat = new Stat();
           // 获取数据
           byte[] bytes = client.getData()
                  .storingStatIn(stat)
                  .forPath("/node");;
           System.out.println(new String((bytes)));
           System.out.println(stat.getVersion());
           System.out.println(stat.getCzxid());
      }
       public static void get3() throws  Exception {
           System.out.println(1);
           // 获取数据
           client.getData()
                  .inBackground((CuratorFramework curatorFramework, CuratorEvent curatorEvent) -> {
                       System.out.println(curatorEvent.getPath() + " " + curatorEvent.getType());
                  })
                  .forPath("/node");;
           System.out.println(2);
      }


       public static void main(String[] args) throws Exception {
           // 工厂创建,fluent风格
           client = CuratorFrameworkFactory.builder()
                   // ip端口号
                  .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183")
                   // 会话超时
                  .sessionTimeoutMs(5000)
                   // 重试机制,这里是超时后1000毫秒重试一次
                  .retryPolicy(new RetryOneTime(1000))
                   // 名称空间,在操作节点的时候,会以这个为父节点,可选操作
                  .namespace("get")
                  .build();
           client.start();
           get1();
           get2();
           get3();

           System.out.println(client.getState() + "操作完成");
           TimeUnit.SECONDS.sleep(20);
           client.close();
      }
    }

     

    读取子节点

     public class curatorGettingStart {
       public static CuratorFramework client;  
       public static void getChildren1() throws  Exception {
           // 获取数据
           List<String> strings = client.getChildren()
                  .forPath("/get");
           strings.forEach(System.out::println);
           System.out.println("------------");
      }
       public static void getChildren2() throws  Exception {
           System.out.println(1);
           // 获取数据
           client.getChildren()
                  .inBackground((curatorFramework, curatorEvent) -> {
                       curatorEvent.getChildren().forEach(System.out::println);
                       System.out.println("------------");
                  })
                  .forPath("/get");
           System.out.println(2);
           System.out.println("------------");
      }


       public static void main(String[] args) throws Exception {
           // 工厂创建,fluent风格
           client = CuratorFrameworkFactory.builder()
                   // ip端口号
                  .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183")
                   // 会话超时
                  .sessionTimeoutMs(5000)
                   // 重试机制,这里是超时后1000毫秒重试一次
                  .retryPolicy(new RetryOneTime(1000))
                   // 名称空间,在操作节点的时候,会以这个为父节点,可选操作
    //               .namespace("get")
                  .build();
           client.start();

           getChildren1();
           getChildren2();

           System.out.println(client.getState() + "操作完成");
           TimeUnit.SECONDS.sleep(20);
           client.close();
      }
    }

     

    watcher

    public class WatcherTest {
       static CuratorFramework client;

       public static void watcher1() throws Exception {
           // arg1 curator的客户端
           // arg2 监视的路径
           NodeCache nodeCache = new NodeCache(client, "/watcher");
           // 启动
           nodeCache.start();
           nodeCache.getListenable().addListener(new NodeCacheListener() {
               @Override
               // 节点变化时的回调方法
               public void nodeChanged() throws Exception {
                   // 路径
                   System.out.println(nodeCache.getCurrentData().getPath() + " " + nodeCache.getCurrentData().getStat());
                   // 输出节点内容
                   System.out.println(new String(nodeCache.getCurrentData().getData()));
              }
          });
           System.out.println("注册完成");
           // 时间窗内可以一直监听
           //       TimeUnit.SECONDS.sleep(1000);
           //关 闭
           nodeCache.close();
      }

       public static void watcher2() throws Exception {
           // arg1 客户端
           // arg2 路径
           // arg3 事件钟是否可以获取节点数据
           PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/watcher", true);
           // 启动
           pathChildrenCache.start();
           pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
               @Override
               // 节点变化时的回调方法
               public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                   if (pathChildrenCacheEvent != null) {
                       // 获取子节点数据
                       System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
                       // 路径
                       System.out.println(pathChildrenCacheEvent.getData().getPath());
                       // 事件类型
                       System.out.println(pathChildrenCacheEvent.getType());
                  }
              }
          });
           // 时间窗内可以一直监听
           TimeUnit.SECONDS.sleep(1000);
           //关 闭
           pathChildrenCache.close();

      }


       public static void main(String[] args) throws Exception {
           // 工厂创建,fluent风格
           client = CuratorFrameworkFactory.builder()
                   // ip端口号
                  .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183")
                   // 会话超时
                  .sessionTimeoutMs(5000)
                   // 重试机制,这里是超时后1000毫秒重试一次
                  .retryPolicy(new RetryOneTime(1000))
                   // 名称空间,在操作节点的时候,会以这个为父节点,可选操作
                   //               .namespace("get")
                  .build();
           client.start();

    //       watcher1();
           watcher2();

           System.out.println(client.getState() + "操作完成");
           TimeUnit.SECONDS.sleep(20);
           client.close();
      }
    }

     

    事务

    public class CuratorTransaction {
       static CuratorFramework client;


       public static void transaction() throws Exception{
           /*client.inTransaction()
                   .create()
                       .withMode(CreateMode.PERSISTENT)
                       .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                       .forPath("/transaction",new byte[0])
                   .and()
                   .setData()
                       .forPath("/setData/transaction",new byte[0])
                   .and()
                   .commit();*/
           client.create()
                  .withMode(CreateMode.PERSISTENT)
                  .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                  .forPath("/transaction",new byte[0]);
           client.setData()
                  .forPath("/setData/transaction",new byte[0]);
      }

       public static void main(String[] args) throws Exception {
           // 工厂创建,fluent风格
           client = CuratorFrameworkFactory.builder()
                   // ip端口号
                  .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183")
                   // 会话超时
                  .sessionTimeoutMs(5000)
                   // 重试机制,这里是超时后1000毫秒重试一次
                  .retryPolicy(new RetryOneTime(1000))
                   // 名称空间,在操作节点的时候,会以这个为父节点,可选操作
                   //               .namespace("get")
                  .build();
           client.start();
           transaction();

           System.out.println(client.getState() + "操作完成");
           TimeUnit.SECONDS.sleep(20);
           client.close();
      }
    }

     

    分布式锁

    • InterProcessMutex:分布式可重入排它锁

    • InterProcessReadWriteLock:分布式读写锁

    public class CuratorDistributeLock {
       public static CuratorFramework client;

       public static void interProcessMutex() throws Exception {
           System.out.println("排他锁");
           // 获取一个分布式排他锁
           InterProcessMutex lock = new InterProcessMutex(client, "/lock1");
           // 开启两个进程测试,会发现:如果一个分布式排它锁获取了锁,那么直到锁释放为止数据都不会被侵扰
           System.out.println("获取锁中");
           lock.acquire();
           System.out.println("操作中");
           for (int i = 0; i < 10; i++) {
               TimeUnit.SECONDS.sleep(1);
               System.out.println(i);
          }
           lock.release();
           System.out.println("释放锁");
      }

       public static void interProcessReadWriteLock1() throws Exception {
           System.out.println("写锁");
           // 分布式读写锁
           InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock1");
           // 开启两个进程测试,观察到写写互斥,特性同排它锁
           System.out.println("获取锁中");
           lock.writeLock().acquire();
           System.out.println("操作中");
           for (int i = 0; i < 10; i++) {
               TimeUnit.SECONDS.sleep(1);
               System.out.println(i);
          }
           lock.writeLock().release();
           System.out.println("释放锁");
      }

       public static void interProcessReadWriteLock2() throws Exception {
           System.out.println("读锁");
           // 分布式读写锁
           InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock1");
           // 开启两个进程测试,观察得到读读共享,两个进程并发进行,注意并发和并行是两个概念,(并发是线程启动时间段不一定一致,并行是时间轴一致的)
           // 再测试两个进程,一个读,一个写,也会出现互斥现象
           System.out.println("获取锁中");
           lock.readLock().acquire();
           System.out.println("操作中");
           for (int i = 0; i < 10; i++) {
               TimeUnit.SECONDS.sleep(1);
               System.out.println(i);
          }
           lock.readLock().release();
           System.out.println("释放锁");
      }


       public static void main(String[] args) throws Exception {
           // 工厂创建,fluent风格
           client = CuratorFrameworkFactory.builder()
                   // ip端口号
                  .connectString("192.168.133.133:2181,192.168.133.133:2182,192.168.133.133:2183")
                   // 会话超时
                  .sessionTimeoutMs(5000)
                   // 重试机制,这里是超时后1000毫秒重试一次
                  .retryPolicy(new RetryOneTime(1000))
                   // 名称空间,在操作节点的时候,会以这个为父节点,可选操作
                   //               .namespace("get")
                  .build();
           client.start();
           //       interProcessMutex();
    //               interProcessReadWriteLock1();
           interProcessReadWriteLock2();


           System.out.println(client.getState() + "操作完成");
           TimeUnit.SECONDS.sleep(20);
           client.close();
      }
    }

     

  • 相关阅读:
    LD_PRELOAD的偷梁换柱之能
    ATOM & Sublime Text 下MarkDown插件功能比较
    存在
    2017年执行计划
    2015年总结以及2016年计划
    2014年总结以及2015年计划
    asp.net MVC中form提交和控制器接受form提交过来的数据
    2013年回顾及2014年计划
    c# XML序列化与反序列化
    Namenode HA原理详解(脑裂)
  • 原文地址:https://www.cnblogs.com/lemon-flm/p/14606677.html
Copyright © 2011-2022 走看看