zoukankan      html  css  js  c++  java
  • 【Zookeeper】源码分析之Watcher机制(一)

    一、前言

      前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。

    二、总体框图

      对于Watcher机制而言,主要涉及的类主要如下。

      

      说明:

      Watcher,接口类型,其定义了process方法,需子类实现。

      Event,接口类型,Watcher的内部类,无任何方法。

      KeeperState,枚举类型,Event的内部类,表示Zookeeper所处的状态。

      EventType,枚举类型,Event的内部类,表示Zookeeper中发生的事件类型。

      WatchedEvent,表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType。

      ClientWatchManager,接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现。

      ZKWatchManager,Zookeeper的内部类,继承ClientWatchManager。

      MyWatcher,ZooKeeperMain的内部类,继承Watcher。

      ServerCnxn,接口类型,继承Watcher,表示客户端与服务端的一个连接。

      WatchManager,管理Watcher。

    三、Watcher源码分析

      3.1 内部类

      Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下  

    public interface Event {}

      说明:可以看到,Event接口并没有定义任何属性和方法,但是其包含了KeeperState和EventType两个内部枚举类。

      3.2 接口方法  

    abstract public void process(WatchedEvent event);

      说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。

    四、Event源码分析

      3.1 内部类

      1. KeeperState  

            public enum KeeperState { // 事件发生时Zookeeper的状态
                /** Unused, this state is never generated by the server */
                @Deprecated
                // 未知状态,不再使用,服务器不会产生此状态
                Unknown (-1), 
    
                /** The client is in the disconnected state - it is not connected
                 * to any server in the ensemble. */
                // 断开
                Disconnected (0),
    
                /** Unused, this state is never generated by the server */
                @Deprecated
                // 未同步连接,不再使用,服务器不会产生此状态
                NoSyncConnected (1),
    
                /** The client is in the connected state - it is connected
                 * to a server in the ensemble (one of the servers specified
                 * in the host connection parameter during ZooKeeper client
                 * creation). */
                // 同步连接状态
                SyncConnected (3),
    
                /**
                 * Auth failed state
                 */
                // 认证失败状态
                AuthFailed (4),
    
                /**
                 * The client is connected to a read-only server, that is the
                 * server which is not currently connected to the majority.
                 * The only operations allowed after receiving this state is
                 * read operations.
                 * This state is generated for read-only clients only since
                 * read/write clients aren't allowed to connect to r/o servers.
                 */
                // 只读连接状态
                ConnectedReadOnly (5),
    
                /**
                  * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
                  * so that they can perform Zookeeper actions with their SASL-authorized permissions.
                  */
                // SASL认证通过状态
                SaslAuthenticated(6),
    
                /** The serving cluster has expired this session. The ZooKeeper
                 * client connection (the session) is no longer valid. You must
                 * create a new client connection (instantiate a new ZooKeeper
                 * instance) if you with to access the ensemble. */
                // 过期状态
                Expired (-112);
    
                // 代表状态的整形值
                private final int intValue;     // Integer representation of value
                                                // for sending over wire
    
                                                
                // 构造函数
                KeeperState(int intValue) {
                    this.intValue = intValue;
                }
    
                // 返回整形值
                public int getIntValue() {
                    return intValue;
                }
    
                // 从整形值构造相应的状态
                public static KeeperState fromInt(int intValue) {
                    switch(intValue) {
                        case   -1: return KeeperState.Unknown;
                        case    0: return KeeperState.Disconnected;
                        case    1: return KeeperState.NoSyncConnected;
                        case    3: return KeeperState.SyncConnected;
                        case    4: return KeeperState.AuthFailed;
                        case    5: return KeeperState.ConnectedReadOnly;
                        case    6: return KeeperState.SaslAuthenticated;
                        case -112: return KeeperState.Expired;
    
                        default:
                            throw new RuntimeException("Invalid integer value for conversion to KeeperState");
                    }
                }
            }

      说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

      2. EventType 

            public enum EventType { // 事件类型
                //
                None (-1),
                // 结点创建
                NodeCreated (1),
                // 结点删除
                NodeDeleted (2),
                // 结点数据变化
                NodeDataChanged (3),
                // 结点子节点变化
                NodeChildrenChanged (4);
    
                // 代表事件类型的整形 
                private final int intValue;     // Integer representation of value
                                                // for sending over wire
    
                // 构造函数
                EventType(int intValue) {
                    this.intValue = intValue;
                }
    
                // 返回整形
                public int getIntValue() {
                    return intValue;
                }
    
                // 从整形构造相应的事件
                public static EventType fromInt(int intValue) {
                    switch(intValue) {
                        case -1: return EventType.None;
                        case  1: return EventType.NodeCreated;
                        case  2: return EventType.NodeDeleted;
                        case  3: return EventType.NodeDataChanged;
                        case  4: return EventType.NodeChildrenChanged;
    
                        default:
                            throw new RuntimeException("Invalid integer value for conversion to EventType");
                    }
                }           
            }
        }

      说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。

    五、WatchedEvent

      5.1 类的属性  

    public class WatchedEvent {
        // Zookeeper的状态
        final private KeeperState keeperState;
        // 事件类型
        final private EventType eventType;
        // 事件所涉及节点的路径
        private String path;
    }

      说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。

      5.2 构造函数

      1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 

        public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
            // 初始化属性
            this.keeperState = keeperState;
            this.eventType = eventType;
            this.path = path;
        }

      说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。

      2. public WatchedEvent(WatcherEvent eventMessage)型构造函数  

        public WatchedEvent(WatcherEvent eventMessage) {
            // 从eventMessage中取出相应属性进行赋值
            keeperState = KeeperState.fromInt(eventMessage.getState());
            eventType = EventType.fromInt(eventMessage.getType());
            path = eventMessage.getPath();
        }

      说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。

      对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。

    六、ClientWatchManager

      6.1 接口方法 

    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
            Watcher.Event.EventType type, String path);

      说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。

    七、ZKWatchManager

      7.1 类的属性 

        private static class ZKWatchManager implements ClientWatchManager {
            // 数据变化的Watchers
            private final Map<String, Set<Watcher>> dataWatches =
                new HashMap<String, Set<Watcher>>();
            // 节点存在与否的Watchers
            private final Map<String, Set<Watcher>> existWatches =
                new HashMap<String, Set<Watcher>>();
            // 子节点变化的Watchers
            private final Map<String, Set<Watcher>> childWatches =
                new HashMap<String, Set<Watcher>>();
        }

      说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。

      7.2 核心方法分析

      1. materialize方法

            public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                            Watcher.Event.EventType type,
                                            String clientPath)
            {
                // 新生成结果Watcher集合
                Set<Watcher> result = new HashSet<Watcher>();
    
                switch (type) { // 确定事件类型
                case None: // 无类型
                    // 添加默认Watcher
                    result.add(defaultWatcher);
                    // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、Zookeeper的状态是否为同步连接)
                    boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                            state != Watcher.Event.KeeperState.SyncConnected;
    
                    synchronized(dataWatches) { // 同步块
                        for(Set<Watcher> ws: dataWatches.values()) {
                            // 添加至结果集合
                            result.addAll(ws);
                        }
                        if (clear) { // 是否需要清空
                            dataWatches.clear();
                        }
                    }
    
                    synchronized(existWatches) { // 同步块 
                        for(Set<Watcher> ws: existWatches.values()) {
                            // 添加至结果集合
                            result.addAll(ws);
                        }
                        if (clear) { // 是否需要清空
                            existWatches.clear();
                        }
                    }
    
                    synchronized(childWatches) { // 同步块
                        for(Set<Watcher> ws: childWatches.values()) {
                            // 添加至结果集合
                            result.addAll(ws);
                        }
                        if (clear) { // 是否需要清空
                            childWatches.clear();
                        }
                    }
                    // 返回结果
                    return result;
                case NodeDataChanged: // 节点数据变化
                case NodeCreated: // 创建节点
                    synchronized (dataWatches) { // 同步块
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(dataWatches.remove(clientPath), result);
                    }
                    synchronized (existWatches) { 
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(existWatches.remove(clientPath), result);
                    }
                    break;
                case NodeChildrenChanged: // 节点子节点变化
                    synchronized (childWatches) {
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(childWatches.remove(clientPath), result);
                    }
                    break;
                case NodeDeleted: // 删除节点
                    synchronized (dataWatches) { 
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(dataWatches.remove(clientPath), result);
                    }
                    // XXX This shouldn't be needed, but just in case
                    synchronized (existWatches) {
                        // 移除clientPath对应的Watcher
                        Set<Watcher> list = existWatches.remove(clientPath);
                        if (list != null) {
                            // 移除clientPath对应的Watcher后全部添加至结果集合
                            addTo(existWatches.remove(clientPath), result);
                            LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                        }
                    }
                    synchronized (childWatches) {
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(childWatches.remove(clientPath), result);
                    }
                    break;
                default: // 缺省处理
                    String msg = "Unhandled watch event type " + type
                        + " with state " + state + " on path " + clientPath;
                    LOG.error(msg);
                    throw new RuntimeException(msg);
                }
    
                // 返回结果集合
                return result;
            }
        }

      说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。

    八、总结

      针对Watcher机制的第一部分的源码分析就已经完成,可以看到此部分的源码相对简单,之后会分析org.apache.zookeeper.server下的WatchManager和ClientWatchManager所在外部类ZooKeeper,也谢谢各位园友的观看~

  • 相关阅读:
    02.02.03第3章 餐饮项目案例(Power BI商业智能分析)
    02.02.02 第2章 制作power bi图表(Power BI商业智能分析)
    MySQL 目录结构信息
    Commons-FileUpload 文件上传(模板)
    Commons-FileUpload 常用API
    Java DOM方式解析XML(模板)
    常用的节点类型
    MySQL权限及登陆、退出方法
    Java 锁
    线程的状态
  • 原文地址:https://www.cnblogs.com/leesf456/p/6286827.html
Copyright © 2011-2022 走看看