服务端接收数据请求
服务端收到的数据包应该在哪里呢?在上节课分析过了,zookeeper 启动的时候,通过下面的代码构建了一个ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
NIOServerCnxnFactory,它实现了 Thread,所以在启动的时候,会在 run 方法中不断循环接收客户端的请求进行分发。
NIOServerCnxnFactory.run
public void run() { while (!ss.socket().isClosed()) { try { for (SelectionKey k : selectedList) { // 获取 client 的连接请求 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // 处理客户端的读/写请求 NIOServerCnxn c = (NIOServerCnxn) k.attachment(); c.doIO(k);// 处理 IO 操作 } else { if (LOG.isDebugEnabled()) { LOG.debug("Unexpected ops in select " + k.readyOps()); } } } selected.clear(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring exception", e); } } closeAll(); LOG.info("NIOServerCnxn factory exited run method"); }
NIOServerCnxn.doIO
void doIO(SelectionKey k) { try { //省略部分代码.. if (k.isReadable()) {//处理读请求,表示接收 //中间这部分逻辑用来处理报文以及粘包问题 if (isPayload) { // not the case for 4letterword readPayload();//处理报文 } else { // four letter words take care // need not do anything else return; } } } }
NIOServerCnxn.readRequest
读取客户端的请求,进行具体的处理
private void readRequest() throws IOException { zkServer.processPacket(this, incomingBuffer); }
ZookeeperServer.processPacket
这个方法根据数据包的类型来处理不同的数据包,对于读写请求,我们主要关注下面这块代码即可:
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); submitRequest(si);
后续的流程,在前面的源码分析中有些,就不做重复粘贴了。
集群模式下的处理流程
集群模式下,涉及到 zab 协议,所以处理流程比较复杂,大家可以基于这个图来定位代码的流程: