zoukankan      html  css  js  c++  java
  • 【Zookeeper】源码分析之Watcher机制(一)

    一、前言

      前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。

    二、总体框图

      对于Watcher机制而言,主要涉及的类主要如下。

      

      说明:

      Watcher,接口类型,其定义了process方法,需子类实现。

      Event,接口类型,Watcher的内部类,无任何方法。

      KeeperState,枚举类型,Event的内部类,表示Zookeeper所处的状态。

      EventType,枚举类型,Event的内部类,表示Zookeeper中发生的事件类型。

      WatchedEvent,表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType。

      ClientWatchManager,接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现。

      ZKWatchManager,Zookeeper的内部类,继承ClientWatchManager。

      MyWatcher,ZooKeeperMain的内部类,继承Watcher。

      ServerCnxn,接口类型,继承Watcher,表示客户端与服务端的一个连接。

      WatchManager,管理Watcher。

    三、Watcher源码分析

      3.1 内部类

      Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下  

    public interface Event {}

      说明:可以看到,Event接口并没有定义任何属性和方法,但是其包含了KeeperState和EventType两个内部枚举类。

      3.2 接口方法  

    abstract public void process(WatchedEvent event);

      说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。

    四、Event源码分析

      3.1 内部类

      1. KeeperState  

            public enum KeeperState { // 事件发生时Zookeeper的状态
                /** Unused, this state is never generated by the server */
                @Deprecated
                // 未知状态,不再使用,服务器不会产生此状态
                Unknown (-1), 
    
                /** The client is in the disconnected state - it is not connected
                 * to any server in the ensemble. */
                // 断开
                Disconnected (0),
    
                /** Unused, this state is never generated by the server */
                @Deprecated
                // 未同步连接,不再使用,服务器不会产生此状态
                NoSyncConnected (1),
    
                /** The client is in the connected state - it is connected
                 * to a server in the ensemble (one of the servers specified
                 * in the host connection parameter during ZooKeeper client
                 * creation). */
                // 同步连接状态
                SyncConnected (3),
    
                /**
                 * Auth failed state
                 */
                // 认证失败状态
                AuthFailed (4),
    
                /**
                 * The client is connected to a read-only server, that is the
                 * server which is not currently connected to the majority.
                 * The only operations allowed after receiving this state is
                 * read operations.
                 * This state is generated for read-only clients only since
                 * read/write clients aren't allowed to connect to r/o servers.
                 */
                // 只读连接状态
                ConnectedReadOnly (5),
    
                /**
                  * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
                  * so that they can perform Zookeeper actions with their SASL-authorized permissions.
                  */
                // SASL认证通过状态
                SaslAuthenticated(6),
    
                /** The serving cluster has expired this session. The ZooKeeper
                 * client connection (the session) is no longer valid. You must
                 * create a new client connection (instantiate a new ZooKeeper
                 * instance) if you with to access the ensemble. */
                // 过期状态
                Expired (-112);
    
                // 代表状态的整形值
                private final int intValue;     // Integer representation of value
                                                // for sending over wire
    
                                                
                // 构造函数
                KeeperState(int intValue) {
                    this.intValue = intValue;
                }
    
                // 返回整形值
                public int getIntValue() {
                    return intValue;
                }
    
                // 从整形值构造相应的状态
                public static KeeperState fromInt(int intValue) {
                    switch(intValue) {
                        case   -1: return KeeperState.Unknown;
                        case    0: return KeeperState.Disconnected;
                        case    1: return KeeperState.NoSyncConnected;
                        case    3: return KeeperState.SyncConnected;
                        case    4: return KeeperState.AuthFailed;
                        case    5: return KeeperState.ConnectedReadOnly;
                        case    6: return KeeperState.SaslAuthenticated;
                        case -112: return KeeperState.Expired;
    
                        default:
                            throw new RuntimeException("Invalid integer value for conversion to KeeperState");
                    }
                }
            }

      说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

      2. EventType 

            public enum EventType { // 事件类型
                //
                None (-1),
                // 结点创建
                NodeCreated (1),
                // 结点删除
                NodeDeleted (2),
                // 结点数据变化
                NodeDataChanged (3),
                // 结点子节点变化
                NodeChildrenChanged (4);
    
                // 代表事件类型的整形 
                private final int intValue;     // Integer representation of value
                                                // for sending over wire
    
                // 构造函数
                EventType(int intValue) {
                    this.intValue = intValue;
                }
    
                // 返回整形
                public int getIntValue() {
                    return intValue;
                }
    
                // 从整形构造相应的事件
                public static EventType fromInt(int intValue) {
                    switch(intValue) {
                        case -1: return EventType.None;
                        case  1: return EventType.NodeCreated;
                        case  2: return EventType.NodeDeleted;
                        case  3: return EventType.NodeDataChanged;
                        case  4: return EventType.NodeChildrenChanged;
    
                        default:
                            throw new RuntimeException("Invalid integer value for conversion to EventType");
                    }
                }           
            }
        }

      说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。

    五、WatchedEvent

      5.1 类的属性  

    public class WatchedEvent {
        // Zookeeper的状态
        final private KeeperState keeperState;
        // 事件类型
        final private EventType eventType;
        // 事件所涉及节点的路径
        private String path;
    }

      说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。

      5.2 构造函数

      1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 

        public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
            // 初始化属性
            this.keeperState = keeperState;
            this.eventType = eventType;
            this.path = path;
        }

      说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。

      2. public WatchedEvent(WatcherEvent eventMessage)型构造函数  

        public WatchedEvent(WatcherEvent eventMessage) {
            // 从eventMessage中取出相应属性进行赋值
            keeperState = KeeperState.fromInt(eventMessage.getState());
            eventType = EventType.fromInt(eventMessage.getType());
            path = eventMessage.getPath();
        }

      说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。

      对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。

    六、ClientWatchManager

      6.1 接口方法 

    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
            Watcher.Event.EventType type, String path);

      说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。

    七、ZKWatchManager

      7.1 类的属性 

        private static class ZKWatchManager implements ClientWatchManager {
            // 数据变化的Watchers
            private final Map<String, Set<Watcher>> dataWatches =
                new HashMap<String, Set<Watcher>>();
            // 节点存在与否的Watchers
            private final Map<String, Set<Watcher>> existWatches =
                new HashMap<String, Set<Watcher>>();
            // 子节点变化的Watchers
            private final Map<String, Set<Watcher>> childWatches =
                new HashMap<String, Set<Watcher>>();
        }

      说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。

      7.2 核心方法分析

      1. materialize方法

            public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                            Watcher.Event.EventType type,
                                            String clientPath)
            {
                // 新生成结果Watcher集合
                Set<Watcher> result = new HashSet<Watcher>();
    
                switch (type) { // 确定事件类型
                case None: // 无类型
                    // 添加默认Watcher
                    result.add(defaultWatcher);
                    // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、Zookeeper的状态是否为同步连接)
                    boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                            state != Watcher.Event.KeeperState.SyncConnected;
    
                    synchronized(dataWatches) { // 同步块
                        for(Set<Watcher> ws: dataWatches.values()) {
                            // 添加至结果集合
                            result.addAll(ws);
                        }
                        if (clear) { // 是否需要清空
                            dataWatches.clear();
                        }
                    }
    
                    synchronized(existWatches) { // 同步块 
                        for(Set<Watcher> ws: existWatches.values()) {
                            // 添加至结果集合
                            result.addAll(ws);
                        }
                        if (clear) { // 是否需要清空
                            existWatches.clear();
                        }
                    }
    
                    synchronized(childWatches) { // 同步块
                        for(Set<Watcher> ws: childWatches.values()) {
                            // 添加至结果集合
                            result.addAll(ws);
                        }
                        if (clear) { // 是否需要清空
                            childWatches.clear();
                        }
                    }
                    // 返回结果
                    return result;
                case NodeDataChanged: // 节点数据变化
                case NodeCreated: // 创建节点
                    synchronized (dataWatches) { // 同步块
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(dataWatches.remove(clientPath), result);
                    }
                    synchronized (existWatches) { 
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(existWatches.remove(clientPath), result);
                    }
                    break;
                case NodeChildrenChanged: // 节点子节点变化
                    synchronized (childWatches) {
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(childWatches.remove(clientPath), result);
                    }
                    break;
                case NodeDeleted: // 删除节点
                    synchronized (dataWatches) { 
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(dataWatches.remove(clientPath), result);
                    }
                    // XXX This shouldn't be needed, but just in case
                    synchronized (existWatches) {
                        // 移除clientPath对应的Watcher
                        Set<Watcher> list = existWatches.remove(clientPath);
                        if (list != null) {
                            // 移除clientPath对应的Watcher后全部添加至结果集合
                            addTo(existWatches.remove(clientPath), result);
                            LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                        }
                    }
                    synchronized (childWatches) {
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(childWatches.remove(clientPath), result);
                    }
                    break;
                default: // 缺省处理
                    String msg = "Unhandled watch event type " + type
                        + " with state " + state + " on path " + clientPath;
                    LOG.error(msg);
                    throw new RuntimeException(msg);
                }
    
                // 返回结果集合
                return result;
            }
        }

      说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。

    八、总结

      针对Watcher机制的第一部分的源码分析就已经完成,可以看到此部分的源码相对简单,之后会分析org.apache.zookeeper.server下的WatchManager和ClientWatchManager所在外部类ZooKeeper,也谢谢各位园友的观看~

  • 相关阅读:
    The Mac Application Environment 不及格的程序员
    Xcode Plugin: Change Code In Running App Without Restart 不及格的程序员
    The property delegate of CALayer cause Crash. 不及格的程序员
    nil localizedTitle in SKProduct 不及格的程序员
    InApp Purchase 不及格的程序员
    Safari Web Content Guide 不及格的程序员
    在Mac OS X Lion 安装 XCode 3.2 不及格的程序员
    illustrate ARC with graphs 不及格的程序员
    Viewing iPhoneOptimized PNGs 不及格的程序员
    What is the dSYM? 不及格的程序员
  • 原文地址:https://www.cnblogs.com/leesf456/p/6286827.html
Copyright © 2011-2022 走看看