zoukankan      html  css  js  c++  java
  • ZooKeeper与Curator注册和监控

    Curator提供了对zookeeper客户端的封装,并监控连接状态和会话session,特别是会话session过期后,curator能够重新连接zookeeper,并且创建一个新的session。

    对于zk的使用者来说,session的概念至关重要,如果想了解更多session的说明,请访问:http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html

     

    zk客户端和zk服务器间主要可能存在下面几种异常情况:

    1.     短暂失去连接:此时客户端检测到与服务端的连接已经断开,但是服务端维护的客户端session尚未过期,之后客户端和服务端重新建立了连接;当客户端重新连接后,由于session没有过期,zookeeper能够保证连接恢复后保持正常服务。

    2.     失去连接时间很长:此时服务器相对于客户端的session已经过期了,与先前session相关的watcher和ephemeral的路径和数据都会消失;当Curator重新创建了与zk的连接后,会获取到session expired异常,Curator会销毁先前的session,并且会创建一个新的session,需要注意的是,与之前session相关的watcher和ephemeral类型的路径和数据在新的session中也不会存在,需要开发者在CuratorFramework.getConnectionStateListenable().addListener()中添加状态监听事件,对ConnectionState.LOST事件进行监听,当session过期后,使得之前的session状态得以恢复。对于ephemeral类型,在客户端应该保持数据的状态,以便及时恢复。

    3.     客户端重新启动:不论先前的zk session是否已经过期,都需要重新创建临时节点、添加数据和watch事件,先前的session也会在稍后的一段时间内过期。

    4.     Zk服务器重新启动:由于zk将session信息存放到了硬盘上,因此重启后,先前未过期的session仍然存在,在zk服务器启动后,客户端与zk服务器创建新的连接,并使用先前的session,与1相同。

    5.     需要注意的是,当session过期了,在session过期期间另外的客户端修改了zk的值,那么这个修改在客户端重新连接到zk上时,zk客户端不会接收到这个修改的watch事件(尽管添加了watch),如果需要严格的watch逻辑,就需要在curator的状态监控中添加逻辑。

     

    特别提示:watcher仅仅是一次性的,zookeeper通知了watcher事件后,就会将这个watcher从session中删除,因此,如果想继续监控,就要添加新的watcher。

     

    下面提供了对persistent和ephemeral两种类型节点的监控方法,其中get方法说明了persistent节点如何监控,而register方法说明了ephemeral类型的节点如何监控。

    public class CuratorTest {

        private CuratorFramework zkTools;

        private ConcurrentSkipListSet watchers newConcurrentSkipListSet();

        private static Charset charset = Charset.forName("utf-8");

       

       

        public CuratorTest() {     

           zkTools = CuratorFrameworkFactory

                  .builder()

                  .connectString("10.11.21.78:12306")

                  .namespace("zk/test")

                  .retryPolicy(new RetryNTimes(2000,20000))

                  .build();

           zkTools.start();

          

            }  

       

       

        public void addReconnectionWatcher(final String path,final ZookeeperWatcherType watcherType,final CuratorWatcher watcher){

           synchronized (this) {

               if(!watchers.contains(watcher.toString()))//不要添加重复的监听事件

               {

                  watchers.add(watcher.toString());

                  System.out.println("add new watcher " + watcher);

                  zkTools.getConnectionStateListenable().addListener(newConnectionStateListener() {  

                      @Override

                      public void stateChanged(CuratorFramework client, ConnectionState newState) {

                         System.out.println(newState);

                         if(newState == ConnectionState.LOST){//处理session过期

                             try{

                                if(watcherType == ZookeeperWatcherType.EXITS){

                                   zkTools.checkExists().usingWatcher(watcher).forPath(path);

                                }else if(watcherType == ZookeeperWatcherType.GET_CHILDREN){

                                   zkTools.getChildren().usingWatcher(watcher).forPath(path);

                                }else if(watcherType == ZookeeperWatcherType.GET_DATA){

                                    zkTools.getData().usingWatcher(watcher).forPath(path);

                                }else if(watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS){

                                    //ephemeral类型的节点session过期了,需要重新创建节点,并且注册监听事件,之后监听事件中,

                                    //会处理create事件,将路径值恢复到先前状态

                                    Stat stat =zkTools.checkExists().usingWatcher(watcher).forPath(path);                             

                                    if(stat == null){

                                       System.err.println("to create");

                                       zkTools.create()

                                       .creatingParentsIfNeeded()

                                       .withMode(CreateMode.EPHEMERAL)

                                       .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

                                       .forPath(path);                                     

                                    }

                                }

                             }catch (Exception e) {

                                e.printStackTrace();

                             }

                         }

                      }

                  });          

               }

           }

        }

       

     

        public void create() throws Exception{

           zkTools.create()//创建一个路径

           .creatingParentsIfNeeded()//如果指定的节点的父节点不存在,递归创建父节点

           .withMode(CreateMode.PERSISTENT)//存储类型(临时的还是持久的)

           .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//访问权限

           .forPath("zk/test");//创建的路径

        }

       

        public void put() throws Exception{

           zkTools.//对路径节点赋值

           setData().

           forPath("zk/test","hello world".getBytes(Charset.forName("utf-8")));

        }

       

        public void get() throws Exception{

           String path = "zk/test";

           ZKWatch watch = new ZKWatch(path);

           byte[] buffer = zkTools.

                             getData().

                             usingWatcher(watch).forPath(path);

           System.out.println(new String(buffer,charset));

           //添加session过期的监控

           addReconnectionWatcher(path, ZookeeperWatcherType.GET_DATA, watch);

        }  

       

       

        public void register() throws Exception{

          

           String ip = InetAddress.getLocalHost().getHostAddress();

           String registeNode = "zk/register/"+ip;//节点路径

          

           byte[] data = "disable".getBytes(charset);//节点值

     

           CuratorWatcher watcher = new ZKWatchRegister(registeNode,data);    //创建一个register watcher

          

           Stat stat = zkTools.checkExists().forPath(registeNode);

           if(stat != null){

               zkTools.delete().forPath(registeNode);

           }

           zkTools.create()

           .creatingParentsIfNeeded()          .withMode(CreateMode.EPHEMERAL)

           .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

           .forPath(registeNode,data);//创建的路径和值

          

           //添加到session过期监控事件中

           addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS,watcher);               

           data = zkTools.getData().usingWatcher(watcher).forPath(registeNode);

           System.out.println("get path form zk : "+registeNode+":"+new String(data,charset));

        }

       

        public static void main(String[] args) throws Exception {

           CuratorTest test = new CuratorTest();

           test.get();

           test.register();

           Thread.sleep(10000000000L);

     

        }

       

        public class ZKWatch implements CuratorWatcher{

           private final String path;

          

           public String getPath() {

               return path;

           }

           public ZKWatch(String path) {

               this.path = path;

           }

           @Override

           public void process(WatchedEvent event) throws Exception {

               System.out.println(event.getType());

               if(event.getType() == EventType.NodeDataChanged){

                  byte[] data = zkTools.

                         getData().

                         usingWatcher(this).forPath(path);

                  System.out.println(path+":"+new String(data,Charset.forName("utf-8")));

               }

           }

          

        }

       

       

        public class ZKWatchRegister implements CuratorWatcher{

           private final String path;

           private byte[] value;

           public String getPath() {

               return path;

           }

           public ZKWatchRegister(String path,byte[] value) {

               this.path = path;

               this.value = value;

           }

           @Override

           public void process(WatchedEvent event) throws Exception {

               System.out.println(event.getType());

               if(event.getType() == EventType.NodeDataChanged){

                  //节点数据改变了,需要记录下来,以便session过期后,能够恢复到先前的数据状态

                  byte[] data = zkTools.

                         getData().

                         usingWatcher(this).forPath(path);

                  value = data;

                  System.out.println(path+":"+new String(data,charset));

               }else if(event.getType() == EventType.NodeDeleted){

                  //节点被删除了,需要创建新的节点

                  System.out.println(path ":" path +" has been deleted.");

                  Stat stat = zkTools.checkExists().usingWatcher(this).forPath(path);

                  if(stat == null){

                      zkTools.create()

                      .creatingParentsIfNeeded()

                      .withMode(CreateMode.EPHEMERAL)

                      .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

                      .forPath(path);

                  }

               }else if(event.getType() == EventType.NodeCreated){

                  //节点被创建时,需要添加监听事件(创建可能是由于session过期后,curator的状态监听部分触发的)

                  System.out.println(path ":" +" has been created!" "the current data is " +new String(value));

                  zkTools.setData().forPath(pathvalue);

                  zkTools.getData().usingWatcher(this).forPath(path);

               }

           }     

        }

       

        public enum ZookeeperWatcherType{

           GET_DATA,GET_CHILDREN,EXITS,CREATE_ON_NO_EXITS

        }

    }

  • 相关阅读:
    Debug技巧
    SOA&微服务&服务网格&高可用
    缓存重点要点一览
    Mysql的变量一览
    计算机基本概念
    SpringMvc中获取Request
    空话大话汇集
    slf4j 作用及logback概述
    TensorFlow实战Google深度学习框架1-4章学习笔记
    Deep Learning.ai学习笔记_第五门课_序列模型
  • 原文地址:https://www.cnblogs.com/chenzhao/p/3929381.html
Copyright © 2011-2022 走看看