zoukankan      html  css  js  c++  java
  • Watcher 实现机制之client注冊

    Zookeeper 提供的了分布式数据的公布/订阅功能,通过 Watch 机制来实现这样的分布式的通知功能。

    Zookeeper 同意client向server注冊一个Watch监听。当服务端的一些指定的事件触发了这个Watch 。就会向指定的client发送一个事件通知来实现分布式通知。
    整个Watch的注冊和通知过程如图:
    Zookeeper的 Watcher 机制主要包含client现成、client WatchManager、Zookeeperserver三部分,详细流程是:client 向Zookeeper注冊的同一时候。把Watcher对象存储在client的WatchManager中,当Zookeeperserver端触发Watcher事件后,会向client发送通知。client线程从WatchManager中取出相应的Watcher对象运行回掉逻辑。

    client注冊Watcher
    可用在创建一个client实例时,向构造方法中传入一个默认的Watcher对象,也能够在调用 getData() , getChildren() , exist() 三个接口向 Zookeeper server注冊Watcher。不管使用哪种方式注冊。原理都是一致的。 这里已 getData接口说明:

    public byte[] getData(final String path, Watcher watcher , Stat stat) 

    在向 getData 接口注冊 Watcher 后。client首先会对当前client请求 request 进行标记。将其置为“使用 Watcher监听”,同一时候会封装一个 Watcher 的注冊对象 WatcherRegistration 对象。用于临时保存数据节点路径和Watcher的相应关系。详细逻辑代码:

    public Stat getData(final String path, Watcher watcher , Stat stat){
         ..........
         WatcherRegistration wrt = null ;
         if(watcher !=null){
              wrt = new DataWatchRegistration(watcher,path);
         }
         ......
         request.setWatche(watcher!=null) ;//------注意此处兴许会用到
         ReplyHeader r = cnxn.submitRequest(h,request,response,wrt);

    在Zookeeper中,Packet 能够被看做是一个最小的通信协议单元。用于client和server端之间进行网络传输,不论什么一个须要传输的对象都要被包装成 Packet 对象进行传输。由于 ClientCnxn 中的 WatcherRegistration 又会被封装到 Packet 中去,然后放入发送队列中等待client发送。

    Packet queuePacket(RequestHeader h , ReplyHeader r , Record reqest , Record response, AsyncCallback cb ,String clientPath , String serverPath , Object ctx , WatcherRegistration wrt){
         .......
         synchronized(outgoingqueue){
              packet = new Packet(h,r,request,response,wrt) ;
              .........
              outgoingqueue.add(packet);
         }
    ....
    }

    随后,Zookeeper client会向服务端发送这个请求,同一时候等待服务端的返回。完毕请求发送后,会由client SendThread 线程的 readResponse 方法接受来自服务端的响应。finishPacket 方法会从 Packet 中取出相应的 Watcher 并注冊到ZkManager中去,代码逻辑:
         
    private void finishPacket(Packet p){
         if(p.watcherRegistration != null){
              p.watcherRegistration.register(p.replyHeader.getErr());
         }
    .......
    }

    如今把 Watcher 对象从 WatchRegistration 对象中取出来:
         
    protected Map<String,HashSet<Watcher>> getWatchers(int rc){
         
         return watchManager.dataWatches ;

    }

    public void register(int rc){
         if(shouldAddWatch(rc)){
              Map<String,HashSet<Watcher>> watches = getWatchers(rc) ;
      synchronized(watches){
         
         Set<Watcher> watchers = watches.get(clientPath) ;
         if(watchers ==null){
                watchers  =new HashSet<Watcher>();
                watches.put(clientPath,watchers);
       }
                 watchers.add(watcher);
         .........
    }
    在上面的register 方法中会吧临时存放的 watcher 对象转交给 ZKWatcherManager , 并终于保存到dataWatchers 中 ,dataWatchers , ZKWatcherManager 都是 Map<String,Set<Watcher>> 类型的数据结构,用于把数据节点的路径和Watcher对象一一映射后保存起来。

       

    在上面的流程中,WatcherRegistration 在底层的网络序列化究竟层的字节数组中。
















  • 相关阅读:
    beego框架学习(一)安装
    专题 :JSON处理
    Java中getClassLoader().getResource()和getResource()的区别
    加载WebApplicationContext的方式
    Web.xml配置详解之context-param
    “Could not open ServletContext resource [/WEB-INF/applicationContext.xml]”解决方案
    如何解决 Eclipse中出现-访问限制由于对必需的库XX具有一定限制,因此无法访问类型
    JDK各个JAR包的作用
    eclipse汉化
    模板专题(一)函数模板
  • 原文地址:https://www.cnblogs.com/llguanli/p/8487102.html
Copyright © 2011-2022 走看看