zoukankan      html  css  js  c++  java
  • Zookeeper-Watcher(事件通知)

    Watcher是什么

    在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别代表了通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)

     

    同一个事件类型在不同的通知状态中代表的含义有所不同,表7-3列举了常见的通知状态和事件类型。

    KeeperState

    EventType

    触发条件

    说明

     

    None
    (-1)

    客户端与服务端成功建立连接

     

    SyncConnected
    (0)

    NodeCreated
    (1)

    Watcher监听的对应数据节点被创建

     

     

    NodeDeleted
    (2)

    Watcher监听的对应数据节点被删除

    此时客户端和服务器处于连接状态

     

    NodeDataChanged
    (3)

    Watcher监听的对应数据节点的数据内容发生变更

     

     

    NodeChildChanged
    (4)

    Wather监听的对应数据节点的子节点列表发生变更

     

    Disconnected
    (0)

    None
    (-1)

    客户端与ZooKeeper服务器断开连接

    此时客户端和服务器处于断开连接状态

    Expired
    (-112)

    Node
    (-1)

    会话超时

    此时客户端会话失效,通常同时也会受到SessionExpiredException异常

    AuthFailed
    (4)

    None
    (-1)

    通常有两种情况,1:使用错误的schema进行权限检查 2:SASL权限检查失败

    通常同时也会收到AuthFailedException异常

    zookeeper是树的结构存储方式,可以通过节点来保存信息,所以可以监听每个节点的变化,来知道程序如何操作zookeeper的,而Watcher的作用就是告诉监听者,zookeeper节点的变化(创建节点,删除节点,修改节点)

    process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。process方法的定义如下:

    abstract public void process(WatchedEvent event);

    这个回调方法的定义非常简单,我们重点看下方法的参数定义:WatchedEvent。

    WatchedEvent包含了每一个事件的三个基本属性:通知状态(keeperState),事件类型(EventType)和节点路径(path),其数据结构如图7-5所示。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。

    提到WatchedEvent,不得不讲下WatcherEvent实体。笼统地讲,两者表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而WatcherEvent因为实现了序列化接口,因此可以用于网络传输。

    服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就能够解析出完整的服务端事件了。

    需要注意的一点是,无论是WatchedEvent还是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。举个例子来说,当/zk-book这个节点的数据发生变更时,服务端会发送给客户端一个“ZNode数据内容变更”事件,客户端只能够接收到如下信

     

    java操作API对Watched事件监听

    package com.zk.watcher;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    public class ZookeeperWatcher implements Watcher {
        // 集群连接地址
        private static final String CONNECT_ADDRES = "127.0.0.1:2181";
        // 会话超时时间
        private static final int SESSIONTIME = 2000;
        // 信号量,zk连接异步,用于阻塞,保证所有的操作都在zk创建连接成功以后再执行
        private static final CountDownLatch countDownLatch = new CountDownLatch(1);
        private ZooKeeper zk;
    
        public void createConnection(String connectAddres, int sessionTimeOut) {
            try {
                zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
                System.out.println("LOG" + "zk 开始启动连接服务器....");
                countDownLatch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public boolean createPath(String path, String data) {
            try {
                this.exists(path, true);
                // 创建持久节点
                this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                System.out.println("LOG" + "节点创建成功, Path:" + path + ",data:" + data);
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
            return true;
        }
    
        /**
         * 判断指定节点是否存在 不存在该节点返回null
         * 
         * @param path
         *            节点路径
         */
        public Stat exists(String path, boolean needWatch) {
            try {
                return this.zk.exists(path, needWatch);
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }
    
        public boolean updateNode(String path, String data) throws KeeperException, InterruptedException {
            exists(path, true);
            this.zk.setData(path, data.getBytes(), -1);
            return false;
        }
    
        public void process(WatchedEvent watchedEvent) {
    
            // 获取事件状态
            KeeperState keeperState = watchedEvent.getState();
            // 获取事件类型
            EventType eventType = watchedEvent.getType();
            // zk 路径
            String path = watchedEvent.getPath();
            // 判断是否建立连接
            if (KeeperState.SyncConnected == keeperState) {
                // 如果当前状态已经连接上了 SyncConnected:连接,AuthFailed:认证失败,Expired:失效过期,
                // ConnectedReadOnly:连接只读,Disconnected:连接失败
                if (EventType.None == eventType) {
                    // 如果建立建立成功,让后程序往下走
                    System.out.println("LOG" + "zk 建立连接成功!");
                    countDownLatch.countDown();
                } else if (EventType.NodeCreated == eventType) {
                    System.out.println("LOG" + "事件通知,新增node节点" + path);
                } else if (EventType.NodeDataChanged == eventType) {
                    System.out.println("LOG" + "事件通知,当前node节点" + path + "被修改....");
                } else if (EventType.NodeDeleted == eventType) {
                    System.out.println("LOG" + "事件通知,当前node节点" + path + "被删除....");
                }
    
            }
        }
    
        public static void main(String[] args) throws KeeperException, InterruptedException {
            ZookeeperWatcher zkClientWatcher = new ZookeeperWatcher();
            zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
            zkClientWatcher.createPath("/createNode", "pa-644064");
            // 修改createNode节点
            zkClientWatcher.updateNode("/createNode", "7894561");
        }
    
    }
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/E:/developer/mvnResponsity/ch/qos/logback/logback-classic/1.1.11/logback-classic-1.1.11.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/E:/developer/mvnResponsity/org/slf4j/slf4j-log4j12/1.7.26/slf4j-log4j12-1.7.26.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
    14:11:26.432 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:host.name=NNH1J2ASSQP10BA
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_172
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.home=E:Javajdk1.8.0_172jre
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.class.path=E:Javajdk1.8.0_172jrelib
    esources.jar;E:Javajdk1.8.0_172jrelib
    t.jar;E:Javajdk1.8.0_172jrelibjsse.jar;E:Javajdk1.8.0_172jrelibjce.jar;E:Javajdk1.8.0_172jrelibcharsets.jar;E:Javajdk1.8.0_172jrelibjfr.jar;E:Javajdk1.8.0_172jrelibextaccess-bridge-64.jar;E:Javajdk1.8.0_172jrelibextcldrdata.jar;E:Javajdk1.8.0_172jrelibextdnsns.jar;E:Javajdk1.8.0_172jrelibextjaccess.jar;E:Javajdk1.8.0_172jrelibextjfxrt.jar;E:Javajdk1.8.0_172jrelibextlocaledata.jar;E:Javajdk1.8.0_172jrelibext
    ashorn.jar;E:Javajdk1.8.0_172jrelibextsunec.jar;E:Javajdk1.8.0_172jrelibextsunjce_provider.jar;E:Javajdk1.8.0_172jrelibextsunmscapi.jar;E:Javajdk1.8.0_172jrelibextsunpkcs11.jar;E:Javajdk1.8.0_172jrelibextzipfs.jar;E:codeservletjava-zk	argetclasses;E:codeservletjava-zklibzookeeper-3.4.14.jar;E:developermvnResponsityorgspringframeworkootspring-boot-starter1.5.21.RELEASEspring-boot-starter-1.5.21.RELEASE.jar;E:developermvnResponsityorgspringframeworkootspring-boot1.5.21.RELEASEspring-boot-1.5.21.RELEASE.jar;E:developermvnResponsityorgspringframeworkspring-context4.3.24.RELEASEspring-context-4.3.24.RELEASE.jar;E:developermvnResponsityorgspringframeworkspring-aop4.3.24.RELEASEspring-aop-4.3.24.RELEASE.jar;E:developermvnResponsityorgspringframeworkspring-beans4.3.24.RELEASEspring-beans-4.3.24.RELEASE.jar;E:developermvnResponsityorgspringframeworkspring-expression4.3.24.RELEASEspring-expression-4.3.24.RELEASE.jar;E:developermvnResponsityorgspringframeworkootspring-boot-autoconfigure1.5.21.RELEASEspring-boot-autoconfigure-1.5.21.RELEASE.jar;E:developermvnResponsityorgspringframeworkootspring-boot-starter-logging1.5.21.RELEASEspring-boot-starter-logging-1.5.21.RELEASE.jar;E:developermvnResponsitychqoslogbacklogback-classic1.1.11logback-classic-1.1.11.jar;E:developermvnResponsitychqoslogbacklogback-core1.1.11logback-core-1.1.11.jar;E:developermvnResponsityorgslf4jjcl-over-slf4j1.7.26jcl-over-slf4j-1.7.26.jar;E:developermvnResponsityorgslf4jjul-to-slf4j1.7.26jul-to-slf4j-1.7.26.jar;E:developermvnResponsityorgslf4jlog4j-over-slf4j1.7.26log4j-over-slf4j-1.7.26.jar;E:developermvnResponsityorgspringframeworkspring-core4.3.24.RELEASEspring-core-4.3.24.RELEASE.jar;E:developermvnResponsityorgyamlsnakeyaml1.17snakeyaml-1.17.jar;E:developermvnResponsityorgapachezookeeperzookeeper3.4.14zookeeper-3.4.14.jar;E:developermvnResponsityorgslf4jslf4j-api1.7.26slf4j-api-1.7.26.jar;E:developermvnResponsityorgslf4jslf4j-log4j121.7.26slf4j-log4j12-1.7.26.jar;E:developermvnResponsitycomgithubspotbugsspotbugs-annotations3.1.9spotbugs-annotations-3.1.9.jar;E:developermvnResponsitycomgooglecodefindbugsjsr3053.0.2jsr305-3.0.2.jar;E:developermvnResponsitylog4jlog4j1.2.17log4j-1.2.17.jar;E:developermvnResponsityjlinejline.9.94jline-0.9.94.jar;E:developermvnResponsityorgapacheyetusaudience-annotations.5.0audience-annotations-0.5.0.jar;E:developermvnResponsityio
    etty
    etty3.10.6.Final
    etty-3.10.6.Final.jar
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.library.path=E:Javajdk1.8.0_172in;C:WindowsSunJavain;C:Windowssystem32;C:Windows;E:/Java/jdk1.8.0_172/jre/bin/server;E:/Java/jdk1.8.0_172/jre/bin;E:/Java/jdk1.8.0_172/jre/lib/amd64;E:xftp;C:Windowssystem32;C:Windows;C:WindowsSystem32Wbem;C:WindowsSystem32WindowsPowerShellv1.0;E:Javajdk1.8.0_172lib;E:Javajdk1.8.0_172jrein;E:mavenapache-maven-3.5.3in;E:softwaremysqlmysql-8.0.15-winx64;C:UsersAdministratorAppDataLocalMicrosoftWindowsApps;;E:stsspring-tool-suite-3.9.4.RELEASE-e4.7.3a-win32-x86_64sts-bundlests-3.9.4.RELEASE;;.
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=C:UsersADMINI~1AppDataLocalTemp
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=<NA>
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Windows 10
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=amd64
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=10.0
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=Administrator
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=C:UsersAdministrator
    14:11:26.434 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=E:codeservletjava-zk
    14:11:26.435 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=2000 watcher=com.zk.watcher.ZookeeperWatcher@2d38eb89
    14:11:26.437 [main] DEBUG org.apache.zookeeper.ClientCnxn - zookeeper.disableAutoWatchReset is false
    LOGzk 开始启动连接服务器....
    14:11:26.550 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.SaslServerPrincipal - Canonicalized address to 127.0.0.1
    14:11:26.552 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
    14:11:26.553 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
    14:11:26.555 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Session establishment request sent on 127.0.0.1/127.0.0.1:2181
    14:11:26.559 [main-SendThread(127.0.0.1:2181)] WARN org.apache.zookeeper.ClientCnxnSocket - Connected to an old server; r-o mode will be unavailable
    14:11:26.559 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x16b263e2f4d0003, negotiated timeout = 4000
    LOGzk 建立连接成功!
    14:11:26.564 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x16b263e2f4d0003, packet:: clientPath:null serverPath:null finished:false header:: 1,3  replyHeader:: 1,188,-101  request:: '/createNode,T  response::  
    14:11:26.571 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got notification sessionid:0x16b263e2f4d0003
    14:11:26.573 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got WatchedEvent state:SyncConnected type:NodeCreated path:/createNode for sessionid 0x16b263e2f4d0003
    LOG事件通知,新增node节点/createNode
    14:11:26.574 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x16b263e2f4d0003, packet:: clientPath:null serverPath:null finished:false header:: 2,1  replyHeader:: 2,189,0  request:: '/createNode,#70612d363434303634,v{s{31,s{'world,'anyone}}},0  response:: '/createNode 
    LOG节点创建成功, Path:/createNode,data:pa-644064
    14:11:26.577 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x16b263e2f4d0003, packet:: clientPath:null serverPath:null finished:false header:: 3,3  replyHeader:: 3,189,0  request:: '/createNode,T  response:: s{189,189,1559715086567,1559715086567,0,0,0,0,9,0,189} 
    14:11:26.581 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got notification sessionid:0x16b263e2f4d0003
    14:11:26.581 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got WatchedEvent state:SyncConnected type:NodeDataChanged path:/createNode for sessionid 0x16b263e2f4d0003
    LOG事件通知,当前node节点/createNode被修改....
    14:11:26.582 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x16b263e2f4d0003, packet:: clientPath:null serverPath:null finished:false header:: 4,5  replyHeader:: 4,190,0  request:: '/createNode,#37383934353631,-1  response:: s{189,190,1559715086567,1559715086579,1,0,0,0,7,0,189} 

  • 相关阅读:
    apache http server 和tomcat的区别 以及nginx
    2020-2-12 这样提升自己的口才
    两种常用的队列
    栈的实现与应用
    线性表
    Nginx实现虚拟主机
    将apache添加到服务
    apache安装
    最小生成树
    图的深度优先搜索
  • 原文地址:https://www.cnblogs.com/920913cheng/p/10969217.html
Copyright © 2011-2022 走看看