ClientCnxnSocketNIO.doIO
服务端处理完成以后,会通过 NIOServerCnxn.sendResponse 发送返回的响应信息,客户端会在 ClientCnxnSocketNIO.doIO 接收服务端的返回,注意一下 SendThread.readResponse,接收服务端的信息进行读取。
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server ses sionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticatio nInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueu e can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } }
SendThread. readResponse
这个方法里面主要的流程如下
- 首先读取 header,如果其 xid == -2,表明是一个 ping 的 response,return
- 如果 xid 是 -4 ,表明是一个 AuthPacket 的 response return
- 如果 xid 是 -1,表明是一个 notification,此时要继续读取并构造一个 enent,通过
- EventThread.queueEvent 发送,return
其它情况下:
从 pendingQueue 拿出一个 Packet,校验后更新 packet 信息
void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); //反序列化 header if (replyHdr.getXid() == -2) { //? // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { //表示当前的消息类型为一个 notification(意味着是服务端的一个响应事件) // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } WatcherEvent event = new WatcherEvent();//? event.deserialize(bbia, "response"); //反序列化响应信息 // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if (serverPath.compareTo(chrootPath) == 0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } eventThread.queueEvent(we); return; } // If SASL authentication is currently in progress, constru ct and // send a response packet immediately, rather than queuing a // response as with other packets. if (tunnelAuthInProgress()) { GetSASLRequest request = new GetSASLRequest(); request.deserialize(bbia, "token"); zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this); return; } Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } packet = pendingQueue.remove(); //因为当前这个数据包已经收到了响应,所以讲它从 pendingQueued 中移除 } /* * Since requests are processed in order, we better get a response * to the first request! */ try {//校验数据包信息,校验成功后讲数据包信息进行更新(替换为服务端的信息) if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + +replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet); } packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } if (packet.response != null && replyHdr.getErr() == 0) { //获得服务端的响应,反序列化以后设置到 packet.response 属性中。所以我们可以在 exists 方法的最后一行通过 packet.response 拿到改请求的返回结果 packet.response.deserialize(bbia, "response"); } if (LOG.isDebugEnabled()) { LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet); } } finally { finishPacket(packet); //最后调用 finishPacket 方法完成处理 } }
finishPacket 方法
主要功能是把从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去。
private void finishPacket(Packet p) { int err = p.replyHeader.getErr(); if (p.watchRegistration != null) { //将事件注册到 zkwatchemanager 中 watchRegistration,熟悉吗? //在组装请求的时候,我们初始化了这个对象 把 watchRegistration 子类里面的 Watcher 实例放到 ZKWatchManager 的 exists Watches 中存储起来。 p.watchRegistration.register(err); } //将所有移除的监视事件添加到事件队列, 这样客户端能收到 “data/child 事件被移除”的事件类型 if (p.watchDeregistration != null) { Map<EventType, Set<Watcher>> materializedWatchers = null; try { materializedWatchers = p.watchDeregistration.unregister(err); for (Entry<EventType, Set<Watcher>> entry : materialize dWatchers.entrySet()) { Set<Watcher> watchers = entry.getValue(); if (watchers.size() > 0) { queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey()); // ignore connectionloss when removing from local session p.replyHeader.setErr(Code.OK.intValue()); } } } catch (KeeperException.NoWatcherException nwe) { p.replyHeader.setErr(nwe.code().intValue()); } catch (KeeperException ke) { p.replyHeader.setErr(ke.code().intValue()); } } //cb 就是 AsnycCallback,如果为 null,表明是同步调用的接口,不需要异步回掉,因此,直接 notifyAll 即可。 if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } } public void register(int rc) { if (shouldAddWatch(rc)) { Map<String, Set<Watcher>> watches = getWatches(rc); //通过子类的实现取得 ZKWatchManager 中的 existsWatches synchronized(watches) { Set<Watcher> watchers = watches.get(clientPath); if (watchers == null) { watchers = new HashSet<Watcher>(); watches.put(clientPath, watchers); } watchers.add(watcher); //将 Watcher 对象放到 ZKWatch Manager 中的 existsWatches 里面 } } }
下面这段代码是客户端存储 watcher 的几个 map 集合,分别对应三种注册监听事件:
static class ZKWatchManager implements ClientWatchManager { private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); }
总的来说,当使用 ZooKeeper 构造方法或者使用 getData、exists 和getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和Watcher 对应关系存储起来备用。
EventThread.queuePacket()
finishPacket 方法最终会调用 eventThread.queuePacket, 讲当前的数据包添加到等待事件通知的队列中:
public void queuePacket(Packet packet) { if (wasKilled) { synchronized (waitingEvents) { if (isRunning) waitingEvents.add(packet); else processEvent(packet); } } else { waitingEvents.add(packet); } }