这篇文章会分析集群模式下服务器和客户端的初始化、数据同步和启动
依旧是从zkServer.sh启动类QuorumPeerMain入手:
初始化、启动
main{
main.initializeAndRun(args){...
config.parse(args[0]);//解析zoo.cfg配置文件 如dataDir、dataLogDir等
...if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);//集群模式
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);//单机模式
}
...}
}
runFromConfig{...
try {...
quorumPeer = getQuorumPeer();//集群每个...
quorumPeer.initialize();
quorumPeer.start();
quorumPeer.join();
...}
...}
start{
loadDataBase();//之前分析过从日志文件加载数据进内存
cnxnFactory.start();
startLeaderElection();//领导者选举
super.start();@1
}
@1 run{
try {//Main loop
while (running) {
switch (getPeerState()) {
case LOOKING:...
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
case LEADING: //之前已经完成了leader的选举
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
}
//leader部分代码
leader.lead{...
try {
self.tick.set(0);
zk.loadData();
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();// cnxAcceptor:the follower acceptor thread @2
...}
...}
@2 run{...
try{
Socket s = ss.accept();
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);//每一个socket启用一个线程处理
fh.start(); @3
...}
@3 run{...
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");//读取follow数据 qp ...
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {...
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());... //获取最新事务id 来自follow
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); //this.getSid(),来自follow的server.pid 获取代号,如第几届...@4
//获取最新的epoch后告诉follow用新的
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
oa.writeRecord(newEpochPacket, "packet");//发送socket
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
leader.waitForEpochAck(this.getSid(), ss);...//等待follow发送ack... @5
int packetToSend = Leader.SNAP;
long zxidToSend = 0;
long leaderLastZxid = 0;
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();...
queuePacket(propose.packet); //committedLog就是用来保存议案的列表 把需要同步的提议加入queuedPackets这个队列
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),null, null);
queuePacket(qcommit);...
//if we are not truncating or sending a diff just send a snapshot 根据follower的最新事务id找到需要同步的数据 如果只是同步快照数据
if (packetToSend == Leader.SNAP) {
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
}
// Start sending packets 单独的线程去发送diff或者truncat数据
new Thread() {
public void run() {
Thread.currentThread().setName(
"Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets{...
oa.writeRecord(p, "packet");
...}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption",e);
}
}
}.start();...
@9 while (true) {
qp = new QuorumPacket();
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
syncLimitCheck.updateAck(qp.getZxid());
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); @10
break;...
...}
@10 processAck{...
p.ackSet.add(sid);...
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ //过半机制
commit{
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); //发送commit给follower 处理更新内存
sendPacket(qp);
}
inform(p); //通知观察者同理
}
@4 getEpochToPropose{...
if (lastAcceptedEpoch >= epoch) {
epoch = lastAcceptedEpoch+1;
}
if (isParticipant(sid)) {// 从follow中挑选不包括obverse
connectingFollowers.add(sid);
}
QuorumVerifier verifier = self.getQuorumVerifier();
if (connectingFollowers.contains(self.getId()) &&
verifier.containsQuorum(connectingFollowers)) {//选取较大的epoch作为新的 containsQuorum过半机制 在满足条件前会进入else wait()
waitingForNewEpoch = false;
self.setAcceptedEpoch(epoch);
connectingFollowers.notifyAll();// 唤醒所有的
} else {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(waitingForNewEpoch && cur < end) {
connectingFollowers.wait(end - cur);
...}
@6 queuePacket{...
...}
//follower 部分代码
follower.followLeader{...
QuorumServer leaderServer = findLeader();
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);...
syncWithLeader(newEpochZxid);
...}
connectToLeader{...// 连接leader
sock = new Socket();
sock.setSoTimeout(self.tickTime * self.initLimit);
for (int tries = 0; tries < 5; tries++) {
try {
sock.connect(addr, self.tickTime * self.syncLimit);
sock.setTcpNoDelay(nodelay);
...}
registerWithLeader{....
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));...
qp.setData(bsid.toByteArray());
writePacket(qp, true);//数据写
readPacket(qp); //正好读到leader发送消息包括epoch
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
if (qp.getType() == Leader.LEADERINFO) {...
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true); //返回ack信息 @5
return ZxidUtils.makeZxid(newEpoch, 0);
...}
syncWithLeader{...//Finally, synchronize our history with the Leader. 同步主节点数据 服务器初始化阶段
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
else if (qp.getType() == Leader.SNAP) { //先清空后反序列化数据到datatree
// The leader is going to dump the database clear our own database and read
zk.getZKDatabase().clear();
zk.getZKDatabase().deserializeSnapshot(leaderIs);
String signature = leaderIs.readString("signature");
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got " + signature);
throw new IOException("Missing signature");
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
} else if (qp.getType() == Leader.TRUNC) {...
while (self.isRunning()) { //当读到leader的UPTODATE会停止循环
readPacket(qp);
switch(qp.getType()) {
case Leader.PROPOSAL:...
case Leader.COMMIT:
if (!writeToTxnLog) {
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;...
// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk;
for(PacketInFlight p: packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec);
}
for(Long zxid: packetsCommitted) {
fzk.commit(zxid); //数据同步完成
}
...}
follower处理请求
org.apache.zookeeper.server.quorum.Learner#syncWithLeader
followLeader{...
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);@7
}
...}
syncWithLeader{...
zk.startup();
...}
startup{
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors();//follower 与 leader的实现类不同,有不同的处理链
registerJMX();
setState(State.RUNNING);
notifyAll();
}
@7 processPacket{...
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
fzk.logRequest(hdr, txn){...
syncProcessor.processRequest(request);//持久化并且返回ack @8 syncProcessor初始化的下一个处理器SendAckRequestProcessor(发送一个ack)leader learnHandle中接受处理ack @9
...}
...}
@8 setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
FollowerRequestProcessor->CommitProcessor->FinalRequestProcessor
SendAckRequestProcessor.start()//持久txn快照
leader处理请求
org.apache.zookeeper.server.quorum.Leader#lead
lead{...
startZkServer{
zk.startup()}
...}
PrepRequestProcessor(check ACL构造txn)->ProposalRequestProcessor(主要是投票的)->CommitProcessor(提交的)->ToBeAppliedRequestProcessor->FinalRequestProcessor(更新内存返回response)
public ProposalRequestProcessor(LeaderZooKeeperServer zks,
RequestProcessor nextProcessor) {
this.zks = zks;
this.nextProcessor = nextProcessor;
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
}
ProposalRequestProcessor.processRequest{...
nextProcessor.processRequest(request); //下一个CommitProcessor
zks.getLeader().propose(request); //发起提议 @7
syncProcessor.processRequest(request); //持久化
...}
CommitProcessor.run{...
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() == 0) {
wait(); //等待提议
...}