首先说明下 zk的源码版本是3.5.5
代码入口在 QuorumPeerMain.main
如果要以分布式方式启动,走的方法是
QuorumPeerMain#runFromConfig
quorumPeer = getQuorumPeer();//new 一个QuorumPeer,可以把QuorumPeer当成zk服务器 quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading( config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); .... //中间是设置各种属性,配置 quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); quorumPeer.start(); quorumPeer.join();
public class QuorumPeer extends ZooKeeperThread
QuorumPeer继承自ZooKeeperThread,而ZooKeeperThread继承自Thread,所以主要就是看它的run方法的实现
QuorumPeer.run
其实核心就是一句话
setCurrentVote(makeLEStrategy().lookForLeader());
其中 Election默认的实现是 FastLeaderElection,一般情况下不会有人再zoo.cfg中配置 electionType,electionType默认值是3,也就是FastLeaderElection
FastLeaderElection#lookForLeader()
public Vote lookForLeader() throws InterruptedException { ...... try { HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());//更新本zk服务的 要投票的 epoch,zxid,myid //其实本 } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); sendNotifications();
其中 updateProposal 会被调用多次,因为如果本zk节点收到比他更适合的leader投票,就会更新自身的投票
synchronized void updateProposal(long leader, long zxid, long epoch){ if(LOG.isDebugEnabled()){ LOG.debug("Updating proposal: " + leader + " (newleader), 0x" + Long.toHexString(zxid) + " (newzxid), " + proposedLeader + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)"); } proposedLeader = leader; proposedZxid = zxid; proposedEpoch = epoch; }
proposedLeader ,proposedZxid ,proposedEpoch
都是FastLeaderElection的成员变量,表示本节点所支持成为leader的投票,也就是该投给谁
然后就是向所有zk服务器发送投票消息
sendNotifications()
private void sendNotifications() { for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes()); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } sendqueue.offer(notmsg); } }
这里简单描述下发送的过程
1 FastLeaderElection 有两个queue,一个是发送queue ,一个是接受queue
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
2 FastLeaderElection 还有两个线程 WorkerReceiver,WorkerSender。从名字就能知道一个是发送一个是接受
3 这两个线程都有一个成员变量QuorumCnxManager,它是真正进行网络通信的工具类
4 发送的时候把消息放到 发送sendqueue里
5 发送线程是一个循环,执行sendqueue的poll逻辑,每次poll指定等待时间3秒,然后调用网络工具类进行发送
6 如果给本节点自身发送消息,QuorumCnxManager会直接把消息放到要交给FastLeaderElection的接收 recvqueue
注意在lookForLeader方法里有一个本地变量
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
这个结构也很关键,他是判断选leader何时结束的关键数据结构。这里简单说下,key是long型,意义是myid,Vote就是投票。Vote有三个成员,分别是epoch,zxid,myid。比较顺序就是先比较epoch,然后zxid,最后myid。原则都是越大优先级越高
上面的准备工作做完了,下面分析选举逻辑
在上面给所有的zk节点发送投票之后,就进入到了一个while循环里。
分为两个部分来讲,第一部分是收到别人的投票怎么处理
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);//从接收queue里顺序的遍历,这里notTimeout是200,也就是200毫秒 /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){//如果还没有收到,要么重发,要么重连 if(manager.haveDelivered()){ sendNotifications(); } else { manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } //这里才是核心逻辑 else if (validVoter(n.sid) && validVoter(n.leader)) { /* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */ switch (n.state) { case LOOKING://如果该消息也是LOOKING // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) {//如果收到的消息epoch比自己的大 logicalclock.set(n.electionEpoch);//本地epoch要跟上大部队,logicalclock相当于是epoch的发生器 recvset.clear();//清楚recvset,因为消息要重发 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); //totalOrderPredicate是判断收到的别人的投票,是不是比自己更适合当leader,如果是更新自己的三个属性 } else {//因为更新过epoch了,所以要更新自己的epoch updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications();//从这里我们能看出来,本地的epoch小于其他服务器,会更新epoch后重新发送。那么其他机器的epoch小于本机的epoch也是会再次把投票发给我们的 } else if (n.electionEpoch < logicalclock.get()) {//如果对方的epoch没有自己大,那就什么都不做,推出switch,重新到while循环里,继续从接收queue里选择消息 //对方会再次发送投票过来的,不必担心退出switch后,再也进不来switch了 if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {//如果对方的投票比我们的优先级高 updateProposal(n.leader, n.zxid, n.peerEpoch);//更新自己的投票三个属性 sendNotifications();//重新发送投票 } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // don't care about the version if it's in LOOKING state recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
最后的 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); 其实非常的精髓,recvset会记录每个机器的投票,甚至是自己的投票。同时也要注意,每次收到消息,recvset都会更新的,因为收到消息意味着,某台服务器发现了可能比自己更合适的leader,又发过来消息,所以就得更新recvset
然后是第二部分,判断是否满足了结束条件
if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } }
protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self .getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } /* * First make the views consistent. Sometimes peers will have different * zxids for a server depending on timing. */ for (Map.Entry<Long, Vote> entry : votes.entrySet()) { //注意这里,votes就是我反复强调的recvset,这里是判断我收到消息的投票,如果和我自己的投票一致,就加入到voteSet if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); } } return voteSet.hasAllQuorums(); }
public boolean hasAllQuorums() { for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) { if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())) return false; } return true; }
QuorumMaj# containsQuorum
public boolean containsQuorum(Set<Long> ackSet) { return (ackSet.size() > half); }
其中half就是参与投票的服务器除2。比如三台机器那么half就是1.同时 (ackSet.size() > half) 这里是大于,也就是投票要大于等于2才满足条件。
我们再回到第二部分,分析剩余部分
// Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){//这里是继续判断,即使满足了结束条件也得再看看是否又收到了新的消息,如果收到了就break,然后再次循环 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) {//代码走到这里说明没有新的消息了,而且也满足了选主条件 self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());//设置自己的角色是leader 还是follower Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; }
看到这里,我们看到选出来主是每个zk服务端自动就会把自己的角色设置好,而不是选出来主,主会再发一次消息告诉大家我是主。
当每个zk服务器中接收消息的队列为空的时候,就说明该发的消息都已经发完了。那么谁是主,就已经确定了