zk 节点是一个 QuorumPeer,选举结束后,leader 和 follower 各自执行自己的逻辑:
org.apache.zookeeper.server.quorum.QuorumPeer#run
org.apache.zookeeper.server.quorum.QuorumPeer#setLeader
org.apache.zookeeper.server.quorum.Leader#lead
org.apache.zookeeper.server.quorum.QuorumPeer#setFollower
org.apache.zookeeper.server.quorum.Follower#followLeader
不管是 leader 还是 follower,都封装了一个 QuorumPeer 对象,QuorumPeer.ServerCnxnFactory 监听端口,处理客户端 io 事件。
请求处理的入口:把请求交给 procesor 链的 firstProcessor
org.apache.zookeeper.server.ZooKeeperServer#processPacket
leader
构建 processor 链
// org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor( finalProcessor, getLeader().toBeApplied); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); firstProcessor = new PrepRequestProcessor(this, proposalProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
请求处理链:PrepRequestProcessor, ProposalRequestProcessor, CommitProcessor, ToBeAppliedRequestProcessor, FinalRequestProcessor
由 ProposalRequestProcessor 构造函数可,还有一条链:SyncRequestProcessor, AckRequestProcessor
public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) { this.zks = zks; this.nextProcessor = nextProcessor; AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); syncProcessor = new SyncRequestProcessor(zks, ackProcessor); }
follower
构建 processor 链
org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#setupRequestProcessors
链1:FollowerRequestProcessor, CommitProcessor, FinalRequestProcessor
链2:SyncRequestProcessor, SendAckRequestProcessor
两阶段写
leader 接收写请求,反序列化请求体
org.apache.zookeeper.server.PrepRequestProcessor#processRequest
leader 发送建议给 follower
// org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest zks.getLeader().propose(request);
leader 发送建议给自身
// 由 leader 的 SyncRequestProcessor 把请求写入事务日志 syncProcessor.processRequest(request);
leader 的 AckRequestProcessor 返回 ack 给自身
org.apache.zookeeper.server.quorum.AckRequestProcessor#processRequest
follower 接收并处理 leader 发送的建议
org.apache.zookeeper.server.quorum.Follower#processPacket
follower 的 SyncRequestProcessor 写入事务日志
org.apache.zookeeper.server.SyncRequestProcessor#run
follower 的 SendAckRequestProcessor 发送 ACK 给 leader
org.apache.zookeeper.server.quorum.SendAckRequestProcessor#processRequest
leader 接收并处理 follower 的 ack
org.apache.zookeeper.server.quorum.LearnerHandler#run
org.apache.zookeeper.server.quorum.Leader#processAck
ack 超过半数,向 follower 发送提交事件(把数据放入发送队列),并把写操作应用到 DataTree
follower 提交写操作,把数据写入 DataTree
CommitProcessor 和 FinalRequestProcessor
数据写入的 2 个关键时机
写事务日志
// org.apache.zookeeper.server.SyncRequestProcessor zks.getZKDatabase().append(si)
写 DataTree
// org.apache.zookeeper.server.FinalRequestProcessor getZKDatabase().processTxn(hdr, txn)
异常情况一:follower 处理 propose 超时,leader 如何处理?
leader 定期发送 ping 给 follower
// org.apache.zookeeper.server.quorum.LearnerHandler#ping public void ping() { long id; if (syncLimitCheck.check(System.nanoTime())) { synchronized(leader) { id = leader.lastProposed; } QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null); queuePacket(ping); } else { LOG.warn("Closing connection to peer due to transaction timeout."); shutdown(); } }
如果发现上一条 propose 超时,则断开与 follower 的连接,follower 会 shutdown,然后重新创建对象,重连 leader,会发送 ack 消息
查看事务日志:
org.apache.zookeeper.server.LogFormatter#main
参数 D:/zk_test/data1/version-2/log.100000001