zoukankan      html  css  js  c++  java
  • zookeeper之客户端接收服务端处理完成的响应

    ClientCnxnSocketNIO.doIO
    服务端处理完成以后,会通过 NIOServerCnxn.sendResponse 发送返回的响应信息,客户端会在 ClientCnxnSocketNIO.doIO 接收服务端的返回,注意一下 SendThread.readResponse,接收服务端的信息进行读取。
    void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
            SocketChannel sock = (SocketChannel) sockKey.channel();
            if (sock == null) {
                throw new IOException("Socket is null!");
            }
            if (sockKey.isReadable()) {
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                    throw new EndOfStreamException(
                            "Unable to read additional data from server ses
                            sionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely server has closed socket");
                }
                if (!incomingBuffer.hasRemaining()) {
                    incomingBuffer.flip();
                    if (incomingBuffer == lenBuffer) {
                        recvCount++;
                        readLength();
                    } else if (!initialized) {
                        readConnectResult();
                        enableRead();
                        if (findSendablePacket(outgoingQueue,
                                cnxn.sendThread.clientTunneledAuthenticatio
                                nInProgress()) != null) {
                            // Since SASL authentication has completed (if
                            client is configured to do so),
                            // outgoing packets waiting in the outgoingQueu
                            e can now be sent.
                                    enableWrite();
                        }
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        updateLastHeard();
                        initialized = true;
                    } else {
                        sendThread.readResponse(incomingBuffer);
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        updateLastHeard();
                    }
                }
            }
        }
     
    SendThread. readResponse
    这个方法里面主要的流程如下
    • 首先读取 header,如果其 xid == -2,表明是一个 ping 的 response,return
    • 如果 xid 是 -4 ,表明是一个 AuthPacket 的 response return
    • 如果 xid 是 -1,表明是一个 notification,此时要继续读取并构造一个 enent,通过
    • EventThread.queueEvent 发送,return
    其它情况下:
    从 pendingQueue 拿出一个 Packet,校验后更新 packet 信息
    void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header"); //反序列化 header
            if (replyHdr.getXid() == -2) { //?
                // -2 is the xid for pings
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms");
                }
                return;
            }
            if (replyHdr.getXid() == -4) {
                // -4 is the xid for AuthPacket
                if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    state = States.AUTH_FAILED;
                    eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
                            Watcher.Event.KeeperState.AuthFailed, null)
                    );
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId));
                }
                return;
            }
            if (replyHdr.getXid() == -1) { //表示当前的消息类型为一个 notification(意味着是服务端的一个响应事件)
                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();//?
                event.deserialize(bbia, "response"); //反序列化响应信息
                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(chrootPath) == 0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                        LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath);
                    }
                }
                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId));
                }
                eventThread.queueEvent(we);
                return;
            }
            // If SASL authentication is currently in progress, constru ct and
            // send a response packet immediately, rather than queuing a
            // response as with other packets.
            if (tunnelAuthInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia, "token");
                zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
                return;
            }
            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
                }
                packet = pendingQueue.remove(); //因为当前这个数据包已经收到了响应,所以讲它从 pendingQueued 中移除
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {//校验数据包信息,校验成功后讲数据包信息进行更新(替换为服务端的信息)
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid "
                            + replyHdr.getXid() + " with err " +
                            +replyHdr.getErr() +
                            " expected Xid "
                            + packet.requestHeader.getXid()
                            + " for a packet with details: "
                            + packet);
                }
                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    //获得服务端的响应,反序列化以后设置到 packet.response 属性中。所以我们可以在 exists 方法的最后一行通过 packet.response 拿到改请求的返回结果
                    packet.response.deserialize(bbia, "response");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet);
                }
            } finally {
                finishPacket(packet); //最后调用 finishPacket 方法完成处理
            }
        }
     
    finishPacket 方法
    主要功能是把从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去。
    private void finishPacket(Packet p) {
            int err = p.replyHeader.getErr();
            if (p.watchRegistration != null) {
                //将事件注册到 zkwatchemanager 中 watchRegistration,熟悉吗?
                //在组装请求的时候,我们初始化了这个对象 把 watchRegistration 子类里面的 Watcher 实例放到 ZKWatchManager 的 exists Watches 中存储起来。
                p.watchRegistration.register(err);
    
            }
            //将所有移除的监视事件添加到事件队列, 这样客户端能收到 “data/child 事件被移除”的事件类型
            if (p.watchDeregistration != null) {
                Map<EventType, Set<Watcher>> materializedWatchers = null;
                try {
                    materializedWatchers = p.watchDeregistration.unregister(err);
                    for (Entry<EventType, Set<Watcher>> entry : materialize
                    dWatchers.entrySet()) {
                        Set<Watcher> watchers = entry.getValue();
                        if (watchers.size() > 0) {
                            queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());
                            // ignore connectionloss when removing from local session
                            p.replyHeader.setErr(Code.OK.intValue());
                        }
                    }
                } catch (KeeperException.NoWatcherException nwe) {
                    p.replyHeader.setErr(nwe.code().intValue());
                } catch (KeeperException ke) {
                    p.replyHeader.setErr(ke.code().intValue());
                }
            }
            //cb 就是 AsnycCallback,如果为 null,表明是同步调用的接口,不需要异步回掉,因此,直接 notifyAll 即可。
            if (p.cb == null) {
                synchronized (p) {
                    p.finished = true;
                    p.notifyAll();
                }
            } else {
                p.finished = true;
                eventThread.queuePacket(p);
            }
        }
        
        
        public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map<String, Set<Watcher>> watches = getWatches(rc);
                //通过子类的实现取得 ZKWatchManager 中的 existsWatches
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher); //将 Watcher 对象放到 ZKWatch Manager 中的 existsWatches 里面
                }
            }
        }
     
    下面这段代码是客户端存储 watcher 的几个 map 集合,分别对应三种注册监听事件:
    static class ZKWatchManager implements ClientWatchManager {
            private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
            private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
            private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
        }
     
    总的来说,当使用 ZooKeeper 构造方法或者使用 getData、exists 和getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和Watcher 对应关系存储起来备用。
     
    EventThread.queuePacket()
    finishPacket 方法最终会调用 eventThread.queuePacket, 讲当前的数据包添加到等待事件通知的队列中:
    public void queuePacket(Packet packet) {
            if (wasKilled) {
                synchronized (waitingEvents) {
                    if (isRunning) waitingEvents.add(packet);
                    else processEvent(packet);
                }
            } else {
                waitingEvents.add(packet);
            }
        }
    
  • 相关阅读:
    HDU 4814 Golden Radio Base
    我对Swift的几点疑问
    【UTR #1】ydc的大树
    jsp中的隐含9对象
    动作元素
    指令元素
    JSP语法
    设计模式六大原则(6):开闭原则
    设计模式六大原则(5):迪米特法则
    设计模式六大原则(4):接口隔离原则
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13571425.html
Copyright © 2011-2022 走看看