ClientCnxnSocketNIO.doIO
服务端处理完成以后,会通过 NIOServerCnxn.sendResponse 发送返回的响应信息,客户端会在 ClientCnxnSocketNIO.doIO 接收服务端的返回,注意一下 SendThread.readResponse,接收服务端的信息进行读取。
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server ses
sionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticatio
nInProgress()) != null) {
// Since SASL authentication has completed (if
client is configured to do so),
// outgoing packets waiting in the outgoingQueu
e can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
}
SendThread. readResponse
这个方法里面主要的流程如下
- 首先读取 header,如果其 xid == -2,表明是一个 ping 的 response,return
- 如果 xid 是 -4 ,表明是一个 AuthPacket 的 response return
- 如果 xid 是 -1,表明是一个 notification,此时要继续读取并构造一个 enent,通过
- EventThread.queueEvent 发送,return
其它情况下:
从 pendingQueue 拿出一个 Packet,校验后更新 packet 信息
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header"); //反序列化 header
if (replyHdr.getXid() == -2) { //?
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null)
);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) { //表示当前的消息类型为一个 notification(意味着是服务端的一个响应事件)
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();//?
event.deserialize(bbia, "response"); //反序列化响应信息
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId));
}
eventThread.queueEvent(we);
return;
}
// If SASL authentication is currently in progress, constru ct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}
packet = pendingQueue.remove(); //因为当前这个数据包已经收到了响应,所以讲它从 pendingQueued 中移除
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {//校验数据包信息,校验成功后讲数据包信息进行更新(替换为服务端的信息)
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
//获得服务端的响应,反序列化以后设置到 packet.response 属性中。所以我们可以在 exists 方法的最后一行通过 packet.response 拿到改请求的返回结果
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet); //最后调用 finishPacket 方法完成处理
}
}
finishPacket 方法
主要功能是把从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去。
private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
//将事件注册到 zkwatchemanager 中 watchRegistration,熟悉吗?
//在组装请求的时候,我们初始化了这个对象 把 watchRegistration 子类里面的 Watcher 实例放到 ZKWatchManager 的 exists Watches 中存储起来。
p.watchRegistration.register(err);
}
//将所有移除的监视事件添加到事件队列, 这样客户端能收到 “data/child 事件被移除”的事件类型
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materialize
dWatchers.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());
// ignore connectionloss when removing from local session
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
p.replyHeader.setErr(ke.code().intValue());
}
}
//cb 就是 AsnycCallback,如果为 null,表明是同步调用的接口,不需要异步回掉,因此,直接 notifyAll 即可。
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
//通过子类的实现取得 ZKWatchManager 中的 existsWatches
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher); //将 Watcher 对象放到 ZKWatch Manager 中的 existsWatches 里面
}
}
}
下面这段代码是客户端存储 watcher 的几个 map 集合,分别对应三种注册监听事件:
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
}
总的来说,当使用 ZooKeeper 构造方法或者使用 getData、exists 和getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和Watcher 对应关系存储起来备用。
EventThread.queuePacket()
finishPacket 方法最终会调用 eventThread.queuePacket, 讲当前的数据包添加到等待事件通知的队列中:
public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) waitingEvents.add(packet);
else processEvent(packet);
}
} else {
waitingEvents.add(packet);
}
}