zoukankan      html  css  js  c++  java
  • ZooKeeper个人笔记客户端watcher和AsycCallback回调

    每一个Watcher具有如下属性:

     1.KeeperState

     2.EventType

     3.path

     4.process(WatchedEvent evnet)回掉方法

    Watcher干嘛的?用户监听session的状态,数据节点的状态等。

    watcher种类:defaultWatcher,非defaultWatcher

    dafaultWatcher是在创建ZooKeeper对象时传递的watcher参数。非defaultWatcher只得是,在调用getData,getChildren,exists函数时传递的watcher对象。

    二者的相同点在于:都可以监听数据节点的状态,比如在getData中使用了defaultWatcher,那么当监听的节点内容发生改变时,defaultWatcher就会收到通知。如果没有使用了非defaultWatcher,也是同样的。

    而这的不同点在于:defaultWatcher会监听session的生命周期,比如session创建成功了,失效了等,而非defaultWatcher不具有这个职责。其次defaultWatcher并不与某一个节点路径相互关联。

     notification event,非notification event

    户端需要接受服务器发送过来的消息,第一种消息是类似于Watcher回掉这种的,我们叫做notification,他的特点是服务器主动发送消息给客户端的,比如客户端a在数据节点a上设置了getData监听,当客户端b修改了节点a后,服务器主动发送NodeDataChanged消息给客户端a。第二中消息是类似于create,getData这种,他们向服务器发送对应的请求后,然后将请求放进到pendingQueue中,然后等待服务器的响应,当接受到服务器的响应后,再从pendingQueue中取出请求,然后进行回掉。对于第二中,只有两类请求是不需要服务器响应的,ping,autu。

    Watcher和AsyncCallback区别

    Watcher用于监听节点的,比如getData对数据节点a设置了watcher,那么当a的数据内容发生改变时,客户端会收到NodeDataChanged通知,然后进行watcher的回掉。

    AsyncCallback是在以异步方式使用ZooKeeper APi时,用户获取api的处理结果的。而这具有本质的不同,不要混淆。

    Watcher和AsyncCallback回调思路

    为什么将Watcher和AsyncCallback放在一起说,这是因为从客户端的角度来看,他们都是异步的,客户端有个后台线程负责处理Watcher和AsyncCallback回调,这里我们好奇的是,客户端如何处理回调的。

       一个最简单的方法就是生产者-消费者模式,生产者向push一个event到waitingEvents事件队列中,消费者线程从waitingEvents中取出event执行回调。这里的event可以是Watcher也可以是AsyncCallback。现在有几个问题需要解决:

      1.生产者线程什么时候push event:对于Watcher回调来说,如果客户端在调用getData API时向节点A注册了一个Watcher,那么当节点A的数据发生改变时服务器会主动向客户端发出响应,客户端收到这个响应后,反序列化出WatchedEvent,然后找到所有在节点A注册的Watcher,将他们打包成一个event并push到waitingEvents。

        对于AsyncCallback来说,如果客户端在调用getData API来获取节点A的数据并且传递了AsyncCallback,那么会将AsyncCallback保存到客户端的pendingQueue并向服务器获取节点A的数据内容,服务器在获取完节点A的数据内容后会向服务器发送响应,客户端拿到这个响应后取出pendingQueue中的AsyncCallback,然后打包成event 并push到waitingEvents。

    Watcher和AsyncCallback回调实现

    1.接受服务器的响应,并反序列化出ReplyHeader: 有一个单独的线程SendThread,负责接收服务器端的响应。假设他接受到的服务器传递过来的字节流失incomingBuffer。那么他做的第一步就应该将这个incomingBuffer反序列化出ReplyHeader。

    2.判断响应类型:判断ReplyHeader是Wacher响应还是AsyncCallback响应:ReplyHeader.getXid()存储了响应类型。

       2.1 如果是Wacher类型响应:从ReplyHeader中创建WatchedEvent,WatchedEvent里面存储了节点的路径,然后去WatcherManager中找到和这个节点相关联的所有Wacher,将他们写入到EventWaiting的waitingEvents中。

       2.2 如果是AsyncCallback类型响应:从ReplyHeader中读取response,这个response描述了是Exists,setData,getData,getChildren,create.....中的哪一个异步回调。从pendingQueue中拿到Package,Package中的cb存储了AsyncCallback,也就是异步API的结果回调。最后将Package写入到EventThreadwaitingEvents中。

    3.回调:EventThread从waitingEvents中取出

    同步getData源码走读:

     /**
         * Return the data and the stat of the node of the given path.
         * <p>
         * If the watch is non-null and the call is successful (no exception is
         * thrown), a watch will be left on the node with the given path. The watch
         * will be triggered by a successful operation that sets data on the node, or
         * deletes the node.
         * <p>
         * A KeeperException with error code KeeperException.NoNode will be thrown
         * if no node with the given path exists.
         *
         * @param path the given path
         * @param watcher explicit watcher
         * @param stat the stat of the node
         * @return the data of the node
         * @throws KeeperException If the server signals an error with a non-zero error code
         * @throws InterruptedException If the server transaction is interrupted.
         * @throws IllegalArgumentException if an invalid path is specified
         */
        public byte[] getData(final String path, Watcher watcher, Stat stat)
            throws KeeperException, InterruptedException
         {
            final String clientPath = path;
            PathUtils.validatePath(clientPath);
    
            // the watch contains the un-chroot path
            WatchRegistration wcb = null;
            if (watcher != null) {
                wcb = new DataWatchRegistration(watcher, clientPath);
            }
    
            final String serverPath = prependChroot(clientPath);
    
            RequestHeader h = new RequestHeader();
            h.setType(ZooDefs.OpCode.getData);
            GetDataRequest request = new GetDataRequest();
            request.setPath(serverPath);
            request.setWatch(watcher != null);
            GetDataResponse response = new GetDataResponse();
    //提交请求,等待请求处理完毕 ReplyHeader r
    = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); }
        public ReplyHeader submitRequest(RequestHeader h, Record request,
                Record response, WatchRegistration watchRegistration)
                throws InterruptedException {
            ReplyHeader r = new ReplyHeader();
    //提交请求到outgoingQueue,outgoingQueue里面的请求是将要发送到服务器的请求,ClientCnxnSocket的doIO()会从outgoingQueue中取出队列发送到服务器 Packet packet
    = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
    //阻塞等待,直到这个请求处理完成
    synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }

    1.将请求头,请求体,响应等送入队列,返回一个packet,然后等待packet完成。

     Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
                Record response, AsyncCallback cb, String clientPath,
                String serverPath, Object ctx, WatchRegistration watchRegistration)
        {
            Packet packet = null;
    
            // Note that we do not generate the Xid for the packet yet. It is
            // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
            // where the packet is actually sent.
    //这里不会为这个packet生成xid,当ClientCnxnSocket::doIO从outgoingQueue中取出packet才生成xid synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath;
    //如果会话已经关闭
    if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing
    //如果客户端请求关闭会话
    if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }

    由于outgoingQueue并不是阻塞队列并且需要向其添加packet对象,所以需要对其synchronized。最后调用sendThread.getClientCnxnSocket().wakeupCnxn.

      /**
         * @return true if a packet was received
         * @throws InterruptedException
         * @throws IOException
         */
        void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
          throws InterruptedException, IOException {
            SocketChannel sock = (SocketChannel) sockKey.channel();
            if (sock == null) {
                throw new IOException("Socket is null!");
            }
    //优先处理读响应
    if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } if (sockKey.isWritable()) { synchronized(outgoingQueue) { Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount++; outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } else { // Just in case enableWrite(); } } } }

    锁定outgoingQueue,获取一个可以发送的packet,如果则个packet还没有被发送,则p.bb!=null,然后生成xid。

    xid初始值为1,每次发送一个packet时,都会递增xid,将其设置到请求头中。

    最后,将packet写入到pendingQueue中。

       void readResponse(ByteBuffer incomingBuffer) throws IOException {
                ByteBufferInputStream bbis = new ByteBufferInputStream(
                        incomingBuffer);
                BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
                ReplyHeader replyHdr = new ReplyHeader();
    
                replyHdr.deserialize(bbia, "header");
    //虽然客户端会发送ping或者auth消息给服务器,但是客户端并不需要等待服务器的响应,也就是说他并没有将请求写入到pendingQueue中
    if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; }

    //notification类型的通知,watcher回掉相关逻辑。
    if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } //根据<clientPath,EventType>从ZKWatcherManager中取出相关的wacher,然后封装成:
    //<EventType(NodeDataChanged),Path(/abc),watche1 for getData,WatchedEvent>
                    //<EventType(NodeDataChanged),Path(/abc),watche1 for exists,WatchedEvent>
                    eventThread.queueEvent( we );
                    return;
                }
    
                // If SASL authentication is currently in progress, construct and
                // send a response packet immediately, rather than queuing a
                // response as with other packets.
                if (clientTunneledAuthenticationInProgress()) {
                    GetSASLRequest request = new GetSASLRequest();
                    request.deserialize(bbia,"token");
                    zooKeeperSaslClient.respondToServer(request.getToken(),
                      ClientCnxn.this);
                    return;
                }
    
                Packet packet;
                synchronized (pendingQueue) {
                    if (pendingQueue.size() == 0) {
                        throw new IOException("Nothing in the queue, but got "
                                + replyHdr.getXid());
                    }
                    packet = pendingQueue.remove();
                }
                /*
                 * Since requests are processed in order, we better get a response
                 * to the first request!
                 */
                try {
                    if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                        packet.replyHeader.setErr(
                                KeeperException.Code.CONNECTIONLOSS.intValue());
                        throw new IOException("Xid out of order. Got Xid "
                                + replyHdr.getXid() + " with err " +
                                + replyHdr.getErr() +
                                " expected Xid "
                                + packet.requestHeader.getXid()
                                + " for a packet with details: "
                                + packet );
                    }
    
                    packet.replyHeader.setXid(replyHdr.getXid());
                    packet.replyHeader.setErr(replyHdr.getErr());
                    packet.replyHeader.setZxid(replyHdr.getZxid());
                    if (replyHdr.getZxid() > 0) {
                        lastZxid = replyHdr.getZxid();
                    }
                    if (packet.response != null && replyHdr.getErr() == 0) {
                        packet.response.deserialize(bbia, "response");
                    }
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Reading reply sessionid:0x"
                                + Long.toHexString(sessionId) + ", packet:: " + packet);
                    }
                } finally {
    //客户端发送的非ping,auth请求,比如getData,setData等,注意事项,如果客户端注册了新的wather,需要将这个watcher保存到ZKWatcherManager中。如果是同步调用,直接通知客户端完成即可,如果是异步调用,就需要进行AsyncCallback回掉。 finishPacket(packet); } }
        private void finishPacket(Packet p) {
    //如果用户显示的注册了watcher,比如在getData中注册了非默认watcher,那么就将watcher添加到ZKWatcherManager中去,很重要的一步哦
    //如果没有这一步,也就没有后面的watcher回掉了。
    if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); }
    //cb就是AsnycCallback,如果为null,表明是同步调用的接口,不需要异步回掉,因此,直接notifyAll即可。
    if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }

    如果watchRegistration!=null,

    如果p.cb==null,说明这个请求已经处理完毕了,因此通知packet完成即可。

    最后,将packet放进waitingEvents中,然后EventThread就可以从waitingEvents中取出packets,然后执行客户端逻辑。

     private void processEvent(Object event) {
              try {
    //watcher回掉逻辑
    if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } else {
    //create,getData等需要异步回掉的 Packet p
    = (Packet) event; int rc = 0; String clientPath = p.clientPath; if (p.replyHeader.getErr() != 0) { rc = p.replyHeader.getErr(); } if (p.cb == null) { LOG.warn("Somehow a null cb got to EventThread!"); } else if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse) { StatCallback cb = (StatCallback) p.cb; if (rc == 0) { if (p.response instanceof ExistsResponse) { cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response) .getStat()); } else if (p.response instanceof SetDataResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response) .getStat()); } else if (p.response instanceof SetACLResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response) .getStat()); } } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetDataResponse) { DataCallback cb = (DataCallback) p.cb; GetDataResponse rsp = (GetDataResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getData(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetACLResponse) { ACLCallback cb = (ACLCallback) p.cb; GetACLResponse rsp = (GetACLResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getAcl(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetChildrenResponse) { ChildrenCallback cb = (ChildrenCallback) p.cb; GetChildrenResponse rsp = (GetChildrenResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren()); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetChildren2Response) { Children2Callback cb = (Children2Callback) p.cb; GetChildren2Response rsp = (GetChildren2Response) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof CreateResponse) { StringCallback cb = (StringCallback) p.cb; CreateResponse rsp = (CreateResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, (chrootPath == null ? rsp.getPath() : rsp.getPath() .substring(chrootPath.length()))); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.cb instanceof VoidCallback) { VoidCallback cb = (VoidCallback) p.cb; cb.processResult(rc, clientPath, p.ctx); } } } catch (Throwable t) { LOG.error("Caught unexpected throwable", t); } } }

    客户端在节点a注册了如下watcher:

    getData(a,watcher1)

    exists(a,watcher2)

    getChildren(a,watcher3)

    因此与a关联的watcher有watcher1和watcher2。如果节点a被修改了,那么客户端会收到notification 类型的通知,这里应是NodeDataChanged事件类型,此时,客户端需要回掉watcher1和watcher2.也就是说,他需要根据<EventType,clientPath>来找到与节点a对应的所有watcher。

    作者:FrancisWang 

    邮箱:franciswbs@163.com
    出处:http://www.cnblogs.com/francisYoung/
    本文地址:http://www.cnblogs.com/francisYoung/p/5225703.html

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

  • 相关阅读:
    pytest之fixture的详细使用
    pytest之自定义标记mark
    解决pytest.mark自定义标签导致的warning
    pytest之参数化parametrize的使用
    Jenkins上allure报告清空上一次运行记录
    Jenkins配置从节点并生成allure测试报告
    《编程珠玑》笔记:数组循环左移
    精确覆盖 DLX
    海量数据的插入和查找 bloom filter
    ORACLE数据库的一些限制
  • 原文地址:https://www.cnblogs.com/francisYoung/p/5225703.html
Copyright © 2011-2022 走看看