服务端接收数据请求
服务端收到的数据包应该在哪里呢?在上节课分析过了,zookeeper 启动的时候,通过下面的代码构建了一个ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
NIOServerCnxnFactory,它实现了 Thread,所以在启动的时候,会在 run 方法中不断循环接收客户端的请求进行分发。
NIOServerCnxnFactory.run
public void run() {
while (!ss.socket().isClosed()) {
try {
for (SelectionKey k : selectedList) {
// 获取 client 的连接请求
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 处理客户端的读/写请求
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);// 处理 IO 操作
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select " + k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
NIOServerCnxn.doIO
void doIO(SelectionKey k) {
try {
//省略部分代码..
if (k.isReadable()) {//处理读请求,表示接收
//中间这部分逻辑用来处理报文以及粘包问题
if (isPayload) { // not the case for 4letterword
readPayload();//处理报文
}
else {
// four letter words take care
// need not do anything else
return;
}
}
}
}
NIOServerCnxn.readRequest
读取客户端的请求,进行具体的处理
private void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
ZookeeperServer.processPacket
这个方法根据数据包的类型来处理不同的数据包,对于读写请求,我们主要关注下面这块代码即可:
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); submitRequest(si);
后续的流程,在前面的源码分析中有些,就不做重复粘贴了。
集群模式下的处理流程
集群模式下,涉及到 zab 协议,所以处理流程比较复杂,大家可以基于这个图来定位代码的流程: