zoukankan      html  css  js  c++  java
  • 深入理解 ZooKeeper客户端与服务端的watcher回调

    2020-02-08 补充本篇博文所描述的watcher回调的流程图
    回调的流程图

    watcher存在的必要性

    举个特容易懂的例子: 假如我的项目是基于dubbo+zookeeper搭建的分布式项目, 我有三个功能相同的服务提供者,用zookeeper当成注册中心,我的三个项目得注册进zookeeper才能对外暴露服务,但是问题来了,写的java代码怎么才能注册进zookeeper呢?当然加入依赖,写好配置文件再启动就成了,这时,这三个服务体提供者就是zookeeper的客户端了,zookeeper的客户端不止一个,我选择了哪个依赖,就是哪个客户端,光有服务提供者不成啊,对外提供服务,我得需要服务消费者啊,于是用同样的方式,把消费者也注册进zookeeper,zookeeper中就存在了4个node,也就是4个客户端,服务消费者订阅zookeeper,向它拉取服务提供者的address,然后把地址缓存在本地, 进而可以远程调用服务消费者,那么问题又来了,万一哪一台服务提供者挂了,怎么办呢?zookeeper是不是得通知消费者呢? 万一哪一天服务提供者的address变了,是不是也得通知消费者? 这就是watcher存在的意义,它解决了这件事

    watcher的类型

    keeperState EventType 触发条件 说明
    SyncConnected None(-1) 客户端与服务端建立连接 客户端与服务端处于连接状态
    SyncConnected NodeCreate(1) watcher监听的数据节点被创建 客户端与服务端处于连接状态
    SyncConnected NodeDeleted(2) Watcher监听的数据节点被删除 客户端与服务端处于连接状态
    SyncConnected NodeDataChanged(3) watcher监听的node数据内容发生改变 客户端与服务端处于连接状态
    SyncConnected NodeChildrenChange(4) 被监听的数据节点的节点列表发生变更 客户端与服务端处于连接状态
    Disconnect None(-1) 客户端与服务端断开连接 客户端与服务端断开连接
    Expired (-112) None(-1) 会话超时 session过期,收到异常SessionExpiredException
    AuthFailed None(-1) 1.使用了错误的scheme 2,SALS权限验证失败了 收到异常AuthFailedException

    实验场景:

    假设我们已经成功启动了zookeeper的服务端和客户端,并且预先添加了watcher,然后使用控制台动态的修改下node的data,我们会发现watcher回调的现象

    添加的钩子函数代码如下:

    public class ZookepperClientTest {
        public static void main(String[] args) throws Exception {
            ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.err.println("连接,触发");
                }
            });
    
        Stat stat = new Stat();
        
         //   todo 下面添加的事件监听器可是实现事件的消费订阅
          String content = new String(client.getData("/node1", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    // todo 任何连接上这个节点的客户端修改了这个节点的 data数据,都会引起process函数的回调
    
                    // todo 特点1:  watch只能使用1次
                    if (event.getType().equals(Event.EventType.NodeDataChanged)){
                        System.err.println("当前节点数据发生了改变");
                    }
                }
            }, stat));
    

    看如上的代码, 添加了一个自己的watcher也就是client.getData("/node1", new Watcher() {} 这是个回调的钩子函数,执行时不会运行,当满足的某个条件时才会执行, 比如: node1被删除了, node1的data被修改了

    getData做了哪些事情?

    源码如下: getdata,顾名思义,返回服务端的node的data+stat, 当然是当服务端的node发生了变化后调用的

    主要主流如下几个工作

    • 创建WatchRegistration wcb= new DataWatchRegistration(watcher, clientPath);
      • 其实就是一个简单的内部类,将path 和 watch 封装进了一个对象
    • 创建一个request,并且初始化这个request.head=getData=4
    • 调用ClientCnxn.submitRequest(...) , 将现存的这些信息进一步封装
    • request.setWatch(watcher != null);说明他并没有将watcher封装进去,而是仅仅做了个有没有watcher的标记
     public byte[] getData(final String path, Watcher watcher, Stat stat)
            throws KeeperException, InterruptedException
         {
             // todo 校验path
            final String clientPath = path;
            PathUtils.validatePath(clientPath);
    
            // the watch contains the un-chroot path
            WatchRegistration wcb = null;
            if (watcher != null) {
                // todo DataWatchRegistration 继承了 WatchRegistration
                // todo DataWatchRegistration 其实就是一个简单的内部类,将path 和 watch 封装进了一个对象
                wcb = new DataWatchRegistration(watcher, clientPath);
            }
    
            final String serverPath = prependChroot(clientPath);
            // todo 创建一个请求头
            RequestHeader h = new RequestHeader();
            h.setType(ZooDefs.OpCode.getData);
    
            // todo 创建了一个GetDataRequest
            GetDataRequest request = new GetDataRequest();
            // todo 给这个请求初始化,path 是传递进来的path,但是 watcher不是!!! 如果我们给定了watcher , 这里面的条件就是  true
            request.setPath(serverPath);
            request.setWatch(watcher != null); // todo 可看看看服务端接收到请求是怎么办的
    
            GetDataResponse response = new GetDataResponse();
    
            // todo 同样由 clientCnxn 上下文进行提交请求, 这个操作应该同样是阻塞的
             // todo EventThread 和 SendThread 同时使用一份 clientCnxn的 submitRequest()
            ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    
            if (r.getErr() != 0) {
                throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                        clientPath);
            }
            if (stat != null) {
                DataTree.copyStat(response.getStat(), stat);
            }
            return response.getData();
        }
    

    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 的源码我写在下面, 这里来到这个方法中,一眼能看到,它依然是阻塞的式的,并且requet被进一步封装进packet

    更重要的是 queuePacket()方法的最后一个参数,存在我们刚刚创建的path+watcher的封装类

    public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        // todo 来到这个 queuePacket() 方法在下面, 这个方法就是将  用户输入-> string ->>> request ->>> packet 的过程
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                null, watchRegistration);
    
    
        // todo 使用同步代码块,在下面的进行    同步阻塞等待, 直到有了Response响应才会跳出这个循环, 这个finished状态就是在客户端接受到服务端的
        // todo 的响应后, 将服务端的响应解析出来,然后放置到 pendingqueue里时,设置上去的
        synchronized (packet) {
            while (!packet.finished) {
                // todo 这个等待是需要唤醒的
                packet.wait();
            }
        }
        // todo 直到上面的代码块被唤醒,才会这个方法才会返回
        return r;
    }
    

    同样,在queuePacket()方法中将packet提交到outgoingQueue中,最终被seadThread消费发送到服务端

    服务端如何处理watchRegistration不为空的packet

    后续我准备用一整篇博客详解单机模式下服务端处理请求的流程,所以这篇博客只说结论

    在服务端,用户的请求最终会按顺序流向三个Processor,分别是

    • PrepRequestProcessor
      • 负责进行一些状态的修改
    • SyncRequestProcessor
      • 将事务日志同步到磁盘
    • FinalRequestProcessor
      • 相应用户的请求

    我们直接去看FinalRequestProcessor public void processRequest(Request request) {}方法,看他针对getData()方式的请求做出了哪些动作.下面来了个小高潮,zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);跟进watcher的有无给服务端添加不同的Watcher

    真的得划重点了,当我发现这一点时,我的心情是超级激动的,就像发现了新大陆一样

    case OpCode.getData: {
            lastOp = "GETD";
            GetDataRequest getDataRequest = new GetDataRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request,
                    getDataRequest);
            DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
            if (n == null) {
                throw new KeeperException.NoNodeException();
            }
            PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                    ZooDefs.Perms.READ,
                    request.authInfo);
            Stat stat = new Stat();
            // todo 这里的操作    getDataRequest.getWatch() ? cnxn : null 对应可客户端的  跟进watcher有没有而决定往服务端传递 true 还是false 相关
            // todo 跟进去 getData()
            byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                    getDataRequest.getWatch() ? cnxn : null);
            //todo  cnxn的Processor()被回调, 往客户端发送数据 , 什么时候触发呢? 就是上面的  处理事务时的回调 第127行
    
            // todo 构建了一个 rsp ,在本类的最后面将rsp 响应给client
            rsp = new GetDataResponse(b, stat);
            break;
        }
    
    

    继续跟进这个getData()在服务端维护了一份path+watcher的map

    public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
                // todo 将path 和 watcher 绑定在一起
                dataWatches.addWatch(path, watcher);
            }
            return n.data;
        }
    }
    
    

    客户端打开命令行,修改服务端node的状态

    书接上回,当客户单的代码去创建ClientCnxn时,有下面的逻辑 , 它开启了两条守护线程, sendThread负责向服务端发送心跳,已经和服务端进行用户相关的IO交流, EventThread就负责和txn事务相关的处理逻辑,级别上升到针对node

        // todo start就是启动了在构造方法中创建的线程
        public void start() {
            sendThread.start();
            eventThread.start();
        }
    

    到目前为止,客户端就有如下三条线程了

    • 负责处理用户在控制台输入命令的主线程
    • 守护线程1: seadThread
    • 守护线程2: eventThread

    跟进主线程的处理用户输入部分的逻辑代码如下:

    下面的代码的主要逻辑就是处理用户输入的命令,当通过if-else选择分支判断用户到底输入的啥命令

    按照我们的假定的场景,用户输入的命令是这样的 set /path newValue 所以,毫无疑问,经过解析后代码会去执行下面的stat = zk.setData(path, args[2].getBytes(),部分

      // todo zookeeper客户端, 处理用户输入命令的具体逻辑
        // todo  用大白话讲,下面其实就是把 从控制台获取的用户的输入信息转换成指定的字符, 然后发送到服务端
        // todo MyCommandOptions 是处理命令行选项和shell脚本的工具类
        protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {
            // todo 在这个方法中可以看到很多的命令行所支持的命令
            Stat stat = new Stat();
            // todo 获取命令行输入中 0 1 2 3 ... 位置的内容, 比如 0 位置是命令  1 2 3 位置可能就是不同的参数
            String[] args = co.getArgArray();
            String cmd = co.getCommand();
            if (args.length < 1) {
                usage();
                return false;
            }
    
            if (!commandMap.containsKey(cmd)) {
                usage();
                return false;
            }
    
            boolean watch = args.length > 2;
            String path = null;
            List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
            LOG.debug("Processing " + cmd);
    
            if (cmd.equals("quit")) {
                System.out.println("Quitting...");
                zk.close();
                System.exit(0);
            } else if (cmd.equals("set") && args.length >= 3) {
                path = args[1];
                stat = zk.setData(path, args[2].getBytes(),
                        args.length > 3 ? Integer.parseInt(args[3]) : -1);
                printStat(stat);
    

    继续跟进stat = zk.setData(path, args[2].getBytes(), 下面的逻辑也很简单,就是将用户的输入封装进来request中,通过ClientCnxn类的submit方法提交到一个队列中,等待着sendThread去消费

    这次有目的的看一下submitRequest的最后一个参数为null, 这个参数是WatchRegistration的位置,一开始置为null

     public Stat setData(final String path, byte data[], int version)
            throws KeeperException, InterruptedException
        {
            final String clientPath = path;
            PathUtils.validatePath(clientPath);
    
            final String serverPath = prependChroot(clientPath);
    
            RequestHeader h = new RequestHeader();
            h.setType(ZooDefs.OpCode.setData);
            SetDataRequest request = new SetDataRequest();
            request.setPath(serverPath);
            request.setData(data);
            request.setVersion(version);
            
            SetDataResponse response = new SetDataResponse();
            ReplyHeader r = cnxn.submitRequest(h, request, response, null);
            if (r.getErr() != 0) {
                throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                        clientPath);
            }
            return response.getStat();
        }
    

    跟进这个submitRequest()方法, 源码如下,不处所料的是,它同样被阻塞住了,直到服务端给了它响应

    当前代码的主要逻辑就是将request封装进packet,然后将packet添加到ClintCnxn维护的outgoingQueue队列中等待sendThread的消费

    这次来到这个方法是因为我们在控制台输入的set 命令而触发的,比较重要的是本次packet携带的WatchRegistration==null, 毫无疑问,这次服务端在FinalRequestProcessor中再处理时取出的watcher==null, 也就不会将path+watcher保存进maptable中

    重要:发送事务消息

    FinalRequestProcessorpublic void processRequest(Request request) {}方法中,有如下代码

    //todo 请求头不为空
        if (request.hdr != null) {
            // 获取请求头
           TxnHeader hdr = request.hdr;
           // 获取事务
           Record txn = request.txn;
            // todo 跟进这个方法-----<--!!!!!!-----处理事务的逻辑,在这里面有向客户端发送事件的逻辑, 回调客户端的watcher----!!!!!!-->
           rc = zks.processTxn(hdr, txn);
        }
    

    继续跟进去

    // todo 处理事物日志
    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        ProcessTxnResult rc;
        int opCode = hdr.getType();
        long sessionId = hdr.getClientId();
        // todo 继续跟进去!!!!!!!!!
        // todo 跟进 processTxn(hdr, txn)
        rc = getZKDatabase().processTxn(hdr, txn);
    

    跟进ZkDatabase.java中的processTxn(hdr, txn)方法

    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        // todo 跟进 processTxn
        return dataTree.processTxn(hdr, txn);
    }
    

    跟进到DataTree.java

      public ProcessTxnResult processTxn(TxnHeader header, Record txn)
        {
            ProcessTxnResult rc = new ProcessTxnResult();
    
            try {
                rc.clientId = header.getClientId();
                rc.cxid = header.getCxid();
                rc.zxid = header.getZxid();
                rc.type = header.getType();
                rc.err = 0;
                rc.multiResult = null;
                switch (header.getType()) { // todo 根据客客户端发送过来的type进行switch,
                    case OpCode.create:
                        CreateTxn createTxn = (CreateTxn) txn;
                        rc.path = createTxn.getPath();
                        // todo  跟进这个创建节点的方法
                        createNode(
                                createTxn.getPath(),
    

    根据请求头的值,进而判断出走到那个switch的分支,当前我们在控制台触发,进入到setData分支如下:跟进这个方法中可以看到它主要做了如下几件事

    • 使用传递进来的新值替代旧data
    • dataWatches.triggerWatch(path, EventType.NodeDataChanged);**触发指定的事件watch,什么事件呢? NodeDataChange, 触发了哪个watcher呢? 跟进去查看 **
    
    
        //todo  setData
        public Stat setData(String path, byte data[], int version, long zxid,
                long time) throws KeeperException.NoNodeException {
            Stat s = new Stat();
            DataNode n = nodes.get(path);
            if (n == null) {
                throw new KeeperException.NoNodeException();
            }
            byte lastdata[] = null;
            synchronized (n) {
                // todo 修改内存的数据
                lastdata = n.data;
                n.data = data;
                n.stat.setMtime(time);
                n.stat.setMzxid(zxid);
                n.stat.setVersion(version);
                n.copyStat(s);
            }
            // now update if the path is in a quota subtree.
            String lastPrefix;
            if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
              this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
                  - (lastdata == null ? 0 : lastdata.length));
            }
            // todo 终于 看到了   服务端 关于触发NodeDataChanged的事件
            dataWatches.triggerWatch(path, EventType.NodeDataChanged);
            return s;
        }
    

    补充Watch & EventType 类图

    watcherAndEventType

    跟进去 dataWatches.triggerWatch(path, EventType.NodeDataChanged);,源码如下, 主要的逻辑就是取出存放在服务端的watch,然后逐个回调他们的processor函数,问题来了,到底是哪些watcher呢? 其实就是我们在客户端启动时添加getData()时存进去的wather,也就是ServerCnxn

    
     // todo 跟进去服务端的 触发事件,  但是吧, 很纳闷. 就是没有往客户端发送数据的逻辑
        public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
            WatchedEvent e = new WatchedEvent(type,
                    KeeperState.SyncConnected, path);
            HashSet<Watcher> watchers;
            synchronized (this) {
                watchers = watchTable.remove(path);
                if (watchers == null || watchers.isEmpty()) {
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logTraceMessage(LOG,
                                ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                "No watchers for " + path);
                    }
                    return null;
                }
                for (Watcher w : watchers) {
                    HashSet<String> paths = watch2Paths.get(w);
                    if (paths != null) {
                        paths.remove(path);
                    }
                }
            }
            for (Watcher w : watchers) {
                if (supress != null && supress.contains(w)) {
                    continue;
                }
                // todo 继续跟进去, 看它如何回调的
                w.process(e);
            }
            return watchers;
        }
    

    怀着激动的心情去看看ServerCnxn的process()方法做了什么事?

    来到ServerCnxn的实现类NIOServerCnxn, 确实很激动,看到了服务端在往客户端发送事务型消息, 并且new ReplyHeader(-1, -1L, 0)第一个位置上的参数是-1, 这一点很重要,因为客户端在接受到这个xid=-1的标记后,就会将这条响应交给EventThread处理

        @Override
        synchronized public void process(WatchedEvent event) {
            ReplyHeader h = new ReplyHeader(-1, -1L, 0);
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                         "Deliver event " + event + " to 0x"
                                         + Long.toHexString(this.sessionId)
                                         + " through " + this);
            }
    
            // Convert WatchedEvent to a type that can be sent over the wire
            WatcherEvent e = event.getWrapper();
            // todo  往服务端发送了 e event类型消息
            sendResponse(h, e, "notification");
        }
    

    处理回调回调watch使用的响应

    进入到SendThread的读就绪源码部分,如下: 它根据header.xid=-1就知道了这是事务类型的响应

    // todo 服务端抛出来的事件, 客户端将把他存在EventThread的 watingEvents 队列中
    // todo 它的实现逻辑也是这样, 会有另外一个线程不断的消费这个队列
    if (replyHdr.getXid() == -1) {
        // -1 means notification
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got notification sessionid:0x"
                    + Long.toHexString(sessionId));
        }
        // todo 创建watcherEvent 并将服务端发送回来的数据,反序列化进这个对象中
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");
    
        // convert from a server path to a client path
        // todo 将server path 反转成 client path
        if (chrootPath != null) {
            String serverPath = event.getPath();
            if (serverPath.compareTo(chrootPath) == 0)
                event.setPath("/");
            else if (serverPath.length() > chrootPath.length())
                event.setPath(serverPath.substring(chrootPath.length()));
            else {
                LOG.warn("Got server path " + event.getPath()
                        + " which is too short for chroot path "
                        + chrootPath);
            }
                  WatchedEvent we = new WatchedEvent(event);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got " + we + " for sessionid 0x"
                                + Long.toHexString(sessionId));
                    }
                    //todo 跟进去
                    eventThread.queueEvent(we);
                    return;
                }
        }
    
    

    在这个方法的最后,将这个相应添加进EventThread消费的队列中,跟进 eventThread.queueEvent(we);

    // todo
    public void queueEvent(WatchedEvent event) {
        // todo 如果事件的类型是 none, 或者sessionState =  直接返回
        /**
         *   todo 事件的类型被设计成 watcher 接口的枚举
         *   None (-1),
         *   NodeCreated (1),
         *   NodeDeleted (2),
         *   NodeDataChanged (3),
         *   NodeChildrenChanged (4);
         */
        if (event.getType() == EventType.None
                && sessionState == event.getState()) {
            return;
        }
        sessionState = event.getState();
    
        // materialize the watchers based on the event
        // todo 根据事件的具体类型,将观察者具体化, 跟进去
        // todo 这个类是ClientCnxn的辅助类,作用就是将watcher 和它观察的事件封装在一起
        WatcherSetEventPair pair = new WatcherSetEventPair(
                //todo 跟进这个 materialize方法. 其实就是从map中取出了和当前client关联的全部 watcher set
    
                watcher.materialize(event.getState(), event.getType(),
                        event.getPath()),
                event);
        // queue the pair (watch set & event) for later processing
        // todo 将watch集合 和 event 进行排队(按顺序添加到队列里了), 以便后续处理 , 怎么处理呢?  就在EventThread的run循环中消费
        // todo watingEvent ==>  LinkedBlockingQueue<Object>
        waitingEvents.add(pair);
    }
    

    上面的代码主要做了如下几件事:

    • 从map中取出和当前事件相关的全部watcher
    • 将watcher set 添加进 waitingEvents队列中,等待EventThead的消费

    跟进 watcher.materialize(event.getState(), event.getType(), 会追到下面的代码

    case NodeDataChanged: // todo node中的data改变和 nodeCreate 都会来到下面的分支
            case NodeCreated:
                synchronized (dataWatches) {
                    // todo dataWatches 就是刚才存放  path : watcher 的map
                    // todo dataWatches.remove(clientPath) 移除并返回clientPath对应的watcher , 放入 result 中
                    addTo(dataWatches.remove(clientPath), result);
                }
    

    上面的dataWatches 就是保存path+watcher set的map, 上面的操作是移除并返回指定的watcher,这也说明了,为什么zk原生客户端添加的watcher仅仅会回调一次

    EventThread是如何消费waitingEvents的

    EventThread是一条守护线程, 因此它拥有自己的不断在运行的run方法,它就是在这个run方法中对这个队列进行消费的

    
            @Override
            public void run() {
                try {
                    isRunning = true;
                    // todo 同样是无限的循环
                    while (true) {
                        // todo 从watingEvnets 中取出一个 WatcherSetEventPair
                        Object event = waitingEvents.take();
                        if (event == eventOfDeath) {
                            wasKilled = true;
                        } else {
                            // todo 本类方法,处理这个事件,继续进入,方法就在下面
                            processEvent(event);
                        }
                        if (wasKilled)
                            synchronized (waitingEvents) {
                                if (waitingEvents.isEmpty()) {
                                    isRunning = false;
                                    break;
                                }
                            }
                    }
                } catc
            
    

    继续跟进它的processEvent(event),最终会在这个方法中调用下面的代码,这里的watcher就是我在本篇博客的开始位置添加进去的watcher,至此打完收工

     watcher.process(pair.event);
    

    总结:

    当客户端启动时添加watcher对某一个特定path上的node进行监听时 , 客户端的watcher被封装进WatcherRegistion中再进一步发送的服务端

    watcher不为空的packet达到服务端后会被巧妙的处理,将ServerCnxn当成watcher注册添加到服务端维护的那份watcher map table中

    当watcher关联的node发生了NodeCreate,NodeDeleted ,NodeDataChannged,NodeChildrenChannged时,在最后一个处理器就会触发发送事务类型事件的动作,其实就是回调ServerCnxn的process()方法

    事务类型的响应返回到客户端,跟进xid区分出到底是哪种响应,如-1是NodeDataChanged,最终会把这个事务事件提交到EventThread消费的waitingEvents等待EventThread消费它,回调客户端的watcher的process()方法

    如果觉得对您有帮助,欢迎点个推荐, 如果有错误,欢迎指出

  • 相关阅读:
    串口RS232和485通信的波形分析
    Ubuntu添加中文输入法
    虚拟机桥接模式联网方法,Xshell的连接与使用
    waitpid 函数详解
    linux for循环 fork() 产生子进程
    【LeetCode解题总结】动态规划篇
    【LeetCode解题总结】递归篇
    【LeetCode解题总结】排序篇
    【LeetCode解题总结】树/图篇
    【LeetCode解题总结】栈/队列篇
  • 原文地址:https://www.cnblogs.com/ZhuChangwu/p/11593642.html
Copyright © 2011-2022 走看看