zoukankan      html  css  js  c++  java
  • zookeeper开源客户端curator

      zookeeper的原生api相对来说比较繁琐,比如:对节点添加监听事件,当监听触发后,我们需要再次手动添加监听,否则监听只生效一次;再比如,断线重连也需要我们手动代码来判断处理等等。对于curator的介绍,从网上百度了一段:Curator是Netflix开源的一套zookeeper客户端框架,用它来操作zookeeper更加方便,按Curator官方所比喻的,guava to JAVA,curator to zookeeper,Curator采用了fluent风格的代码,非常简洁

    ---------------正-----文-----------------------------------------------------------------------------------------------------------------------------------------------

      Curator包含6部分,均可提供单独jar包,每个包简单介绍如下:

      client:zk-client的替代品,提供一些底层处理跟工具类;

      framework:高级封装,大大简化了zk的客户端编程,包含对zk的连接管理,重试机制等;

      repices:提供了一些常用的操作,比如持续监听,锁,选举等;

      utilities:各种工具类;

      errors:curator对异常跟错误的处理;

      extendsion:扩展包;

      基本api代码解释:

      1、建立连接

      建立连接需要指定zk地址以及重试策略等,先上代码再解释:

     RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);//重试5次,每次间隔时间指数增长(有具体增长公式)
     RetryPolicy retry1 = new RetryNTimes(5, 5000);//重试5次,每次间隔5秒
     RetryPolicy retry2 = new RetryUntilElapsed(60000 * 2, 5000);//重试2分钟,每次间隔5秒
    //普通创建 CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 5000, retry); //fluent风格创建 CuratorFramework client1 = CuratorFrameworkFactory.builder() .connectString("localhost:2181") .connectionTimeoutMs(5000) //连接超时时间 .sessionTimeoutMs(3000) //会话超时时间 .retryPolicy(retry) .build(); //建立连接 client.start();

      如注释,创建客户端连接我们通常需要指定重试策略,curator提供了3种重试机制,分别如上;对于fluent风格,就是每个操作都返回了一个对象,我们可以一直通过[.方法名]的方式书写代码;client创建了之后,需要调用start方法才能真正去建立连接。会话超时时间是指当连接发生故障时,由于zk的心跳机制检测,服务端认为会话超时的时间,会清除session;

      2、创建、删除、更新节点

      连接建立之后,我们可以在服务器上进行创建节点的操作,代码如下:

     //创建节点 
     String path = client.create()
              .creatingParentsIfNeeded()        //对节点路径上没有的节点进行创建
              .withMode(CreateMode.EPHEMERAL)   //临时节点
              .forPath("/curator/test", "123".getBytes());  //节点路径,节点的值
    
     //删除节点
     client.delete()
            .guaranteed()      //删除失败,则客户端持续删除,直到节点删除为止
            .deletingChildrenIfNeeded()   //删除相关子节点
            .withVersion(-1)    //无视版本,直接删除
            .forPath("/curator/mytest");
     //更新节点信息
     Stat stat2 = new Stat();
     byte[] theValue2 = client.getData().storingStatIn(stat).forPath("/curator/test");
     client.setData()
            .withVersion(stat2.getVersion())  //版本校验,与当前版本不一致则更新失败,-1则无视版本信息进行更新
            .forPath("/curator/test", "456".getBytes());
     //判断节点是否存在(存在返回节点信息,不存在则返回null)
     Stat s = client.checkExists().forPath("/curator/test");

      持久节点是persistent,我们创建的节点可能有好几层,如果服务器不存在父节点则会报错并创建失败,createingParentsIfNodeed()的作用是在父节点不存在的时候进行创建。删除操作可能由于网络抖动等情况导致删除失败,由于节点数据操作一般对业务影响较大,故多数都会带持续删除的动作来确保正确删除;节点更新删除等操作若考虑版本校验,则采用代码所示方式,在获取节点数据的时候对节点状态进行赋值,然后通过节点状态可以获得版本信息。判断节点是否存在,一般通过节点信息判断,若不存在,则节点信息为null。

      3、获取字节点列表

      只有一行代码,返回string类型的list

     //获取子节点列表
     List<String> paths = client.getChildren().forPath("/curator");
    

      4、异步操作

      异步操作不会阻塞代码执行,对于操作完成后的业务处理,需要设定回调函数来完成。以判断节点是否存在为例:

     ExecutorService es = Executors.newFixedThreadPool(5);//异步操作线程池,
     //异步判断操作
     Stat s1 = client.checkExists().inBackground().forPath("/curator/test"); //无回调
     client.checkExists().inBackground(new BackgroundCallback() {  //有回调
         @Override
         public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
             CuratorEventType c = curatorEvent.getType();//事件类型,可在CuratorEventType看到具体种类
             int r = curatorEvent.getResultCode();//0,执行成功,其它,执行失败
             Object o = curatorEvent.getContext();//事件上下文,一般是由调用方法传入,供回调函数使用的参数
             String p = curatorEvent.getPath();//节点路径
             List<String> li = curatorEvent.getChildren();//子节点列表
             byte[] datas = curatorEvent.getData();//节点数据
             //一些其它操作
         }
     },es).forPath("/curator/test");
    

      异步操作实际是在后台另起一个线程来完成该操作,若线程较多势必会影响服务器性能,所以要用线程池来尽量降低对服务器的消耗。需要考虑线程池的关闭操作,较繁琐,不作赘述。

      5、节点、子节点监听

      节点监听需要用repices包中的NodeCache来完成,代码如下:

     //节点监听
     final NodeCache cache = new NodeCache(client,"/curator/test");
     cache.start();
     cache.getListenable().addListener(new NodeCacheListener() {//监听对象
         @Override
         public void nodeChanged() throws Exception {//重写监听方法
             byte[] ret = cache.getCurrentData().getData();
             System.out.println("当前节点内容是:"+ new String(ret));
         }
     });
    

      子节点的监听需要用PathChildrenCache来完成,跟节点本身不一样,代码如下:

     //子节点监听
     final PathChildrenCache pccache = new PathChildrenCache(client,"/curator",true);//true指当子节点变化时,获取子节点内容
     pccache.start();
     pccache.getListenable().addListener(new PathChildrenCacheListener() {
         @Override
         public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {//重写监听方法
             switch (pathChildrenCacheEvent.getType()){//子节点的事件类型
                 case CHILD_ADDED:
                     System.out.println(pathChildrenCacheEvent.getData());//通过pathChildrenCacheEvent,可以获取到节点相关的数据
                     break;
                 case CHILD_REMOVED:
                     System.out.println(pathChildrenCacheEvent.getData().getPath());
                     break;
                 case CHILD_UPDATED:
                     break;
                 default:
                     break;
             }
         }
     });
    

      6、权限控制部分,略。

      只是简单罗列了curator的一些基本用法,距离具体应用还是有很大差距,稍后将动手写一下选主程序来练手试试,希望顺利。

       

  • 相关阅读:
    vim常用命令总结
    深度学习之 GAN 进行 mnist 图片的生成
    javascript 中的类型
    架构设计小思
    [前端]如何让图片等比例缩放,同时撑满父级容器的长或宽
    深度学习之 seq2seq 进行 英文到法文的翻译
    深度学习之 cnn 进行 CIFAR10 分类
    深度学习之 rnn 台词生成
    深度学习之 mnist 手写数字识别
    前端页面,使用 dom 鼠标拖拽画一个矩形和监听键盘
  • 原文地址:https://www.cnblogs.com/nevermorewang/p/5815602.html
Copyright © 2011-2022 走看看