zoukankan      html  css  js  c++  java
  • Zk选举源码分析

    首先说明下 zk的源码版本是3.5.5

    代码入口在  QuorumPeerMain.main

    如果要以分布式方式启动,走的方法是 

     QuorumPeerMain#runFromConfig

    quorumPeer = getQuorumPeer();//new 一个QuorumPeer,可以把QuorumPeer当成zk服务器
              quorumPeer.setTxnFactory(new FileTxnSnapLog(
                          config.getDataLogDir(),
                          config.getDataDir()));
              quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
              quorumPeer.enableLocalSessionsUpgrading(
                  config.isLocalSessionsUpgradingEnabled());
              //quorumPeer.setQuorumPeers(config.getAllMembers());
              quorumPeer.setElectionType(config.getElectionAlg());
              quorumPeer.setMyid(config.getServerId());
              .... //中间是设置各种属性,配置
              quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
              quorumPeer.initialize();
              
              quorumPeer.start();
              quorumPeer.join();
    public class QuorumPeer extends ZooKeeperThread

    QuorumPeer继承自ZooKeeperThread,而ZooKeeperThread继承自Thread,所以主要就是看它的run方法的实现

    QuorumPeer.run

    其实核心就是一句话

    setCurrentVote(makeLEStrategy().lookForLeader());

    其中 Election默认的实现是 FastLeaderElection,一般情况下不会有人再zoo.cfg中配置 electionType,electionType默认值是3,也就是FastLeaderElection

    FastLeaderElection#lookForLeader()

    public Vote lookForLeader() throws InterruptedException {
            ......    
            try {
                HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
    
                HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
    
                int notTimeout = finalizeWait;
    
                synchronized(this){
                    logicalclock.incrementAndGet();
                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());//更新本zk服务的 要投票的 epoch,zxid,myid
                    //其实本
                }
    
                LOG.info("New election. My id =  " + self.getId() +
                        ", proposed zxid=0x" + Long.toHexString(proposedZxid));
                sendNotifications();

    其中 updateProposal 会被调用多次,因为如果本zk节点收到比他更适合的leader投票,就会更新自身的投票

    synchronized void updateProposal(long leader, long zxid, long epoch){
            if(LOG.isDebugEnabled()){
                LOG.debug("Updating proposal: " + leader + " (newleader), 0x"
                        + Long.toHexString(zxid) + " (newzxid), " + proposedLeader
                        + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");
            }
            proposedLeader = leader;
            proposedZxid = zxid;
            proposedEpoch = epoch;
        }

    proposedLeader ,proposedZxid ,proposedEpoch 
    都是FastLeaderElection的成员变量,表示本节点所支持成为leader的投票,也就是该投给谁

    然后就是向所有zk服务器发送投票消息

    sendNotifications()

    private void sendNotifications() {
            for (long sid : self.getCurrentAndNextConfigVoters()) {
                QuorumVerifier qv = self.getQuorumVerifier();
                ToSend notmsg = new ToSend(ToSend.mType.notification,
                        proposedLeader,
                        proposedZxid,
                        logicalclock.get(),
                        QuorumPeer.ServerState.LOOKING,
                        sid,
                        proposedEpoch, qv.toString().getBytes());
                if(LOG.isDebugEnabled()){
                    LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                          Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                          " (n.round), " + sid + " (recipient), " + self.getId() +
                          " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
                }
                sendqueue.offer(notmsg);
            }
        }

    这里简单描述下发送的过程

    1 FastLeaderElection 有两个queue,一个是发送queue ,一个是接受queue

     LinkedBlockingQueue<ToSend> sendqueue;
        LinkedBlockingQueue<Notification> recvqueue;

    2  FastLeaderElection 还有两个线程 WorkerReceiver,WorkerSender。从名字就能知道一个是发送一个是接受

    3 这两个线程都有一个成员变量QuorumCnxManager,它是真正进行网络通信的工具类

    4 发送的时候把消息放到 发送sendqueue里

    5 发送线程是一个循环,执行sendqueue的poll逻辑,每次poll指定等待时间3秒,然后调用网络工具类进行发送

    6 如果给本节点自身发送消息,QuorumCnxManager会直接把消息放到要交给FastLeaderElection的接收 recvqueue

     

    注意在lookForLeader方法里有一个本地变量 

     HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); 

    这个结构也很关键,他是判断选leader何时结束的关键数据结构。这里简单说下,key是long型,意义是myid,Vote就是投票。Vote有三个成员,分别是epoch,zxid,myid。比较顺序就是先比较epoch,然后zxid,最后myid。原则都是越大优先级越高

    上面的准备工作做完了,下面分析选举逻辑

    在上面给所有的zk节点发送投票之后,就进入到了一个while循环里。

    分为两个部分来讲,第一部分是收到别人的投票怎么处理

    while ((self.getPeerState() == ServerState.LOOKING) &&
                        (!stop)){
                    /*
                     * Remove next notification from queue, times out after 2 times
                     * the termination time
                     */
                    Notification n = recvqueue.poll(notTimeout,
                            TimeUnit.MILLISECONDS);//从接收queue里顺序的遍历,这里notTimeout是200,也就是200毫秒
    
                    /*
                     * Sends more notifications if haven't received enough.
                     * Otherwise processes new notification.
                     */
                    if(n == null){//如果还没有收到,要么重发,要么重连
                        if(manager.haveDelivered()){
                            sendNotifications();
                        } else {
                            manager.connectAll();
                        }
    
                        /*
                         * Exponential backoff
                         */
                        int tmpTimeOut = notTimeout*2;
                        notTimeout = (tmpTimeOut < maxNotificationInterval?
                                tmpTimeOut : maxNotificationInterval);
                        LOG.info("Notification time out: " + notTimeout);
                    } 
                    //这里才是核心逻辑
                    else if (validVoter(n.sid) && validVoter(n.leader)) {
                        /*
                         * Only proceed if the vote comes from a replica in the current or next
                         * voting view for a replica in the current or next voting view.
                         */
                        switch (n.state) {
                        case LOOKING://如果该消息也是LOOKING
                            // If notification > current, replace and send messages out
                            if (n.electionEpoch > logicalclock.get()) {//如果收到的消息epoch比自己的大
                                logicalclock.set(n.electionEpoch);//本地epoch要跟上大部队,logicalclock相当于是epoch的发生器
                                recvset.clear();//清楚recvset,因为消息要重发
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                    updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    //totalOrderPredicate是判断收到的别人的投票,是不是比自己更适合当leader,如果是更新自己的三个属性
                                } else {//因为更新过epoch了,所以要更新自己的epoch
                                    updateProposal(getInitId(),
                                            getInitLastLoggedZxid(),
                                            getPeerEpoch());
                                }
                                sendNotifications();//从这里我们能看出来,本地的epoch小于其他服务器,会更新epoch后重新发送。那么其他机器的epoch小于本机的epoch也是会再次把投票发给我们的
                            } else if (n.electionEpoch < logicalclock.get()) {//如果对方的epoch没有自己大,那就什么都不做,推出switch,重新到while循环里,继续从接收queue里选择消息
                                //对方会再次发送投票过来的,不必担心退出switch后,再也进不来switch了
                                if(LOG.isDebugEnabled()){
                                    LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                            + Long.toHexString(n.electionEpoch)
                                            + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                                }
                                break;
                            } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)) {//如果对方的投票比我们的优先级高
                                updateProposal(n.leader, n.zxid, n.peerEpoch);//更新自己的投票三个属性
                                sendNotifications();//重新发送投票
                            }
    
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Adding vote: from=" + n.sid +
                                        ", proposed leader=" + n.leader +
                                        ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                        ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                            }
    
                            // don't care about the version if it's in LOOKING state
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

    最后的  recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); 其实非常的精髓,recvset会记录每个机器的投票,甚至是自己的投票。同时也要注意,每次收到消息,recvset都会更新的,因为收到消息意味着,某台服务器发现了可能比自己更合适的leader,又发过来消息,所以就得更新recvset

    然后是第二部分,判断是否满足了结束条件

            if (termPredicate(recvset,
                                    new Vote(proposedLeader, proposedZxid,
                                            logicalclock.get(), proposedEpoch))) {
    
                                // Verify if there is any change in the proposed leader
                                while((n = recvqueue.poll(finalizeWait,
                                        TimeUnit.MILLISECONDS)) != null){
                                    if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                            proposedLeader, proposedZxid, proposedEpoch)){
                                        recvqueue.put(n);
                                        break;
                                    }
                                }
    
                                /*
                                 * This predicate is true once we don't read any new
                                 * relevant message from the reception queue
                                 */
                                if (n == null) {
                                    self.setPeerState((proposedLeader == self.getId()) ?
                                            ServerState.LEADING: learningState());
                                    Vote endVote = new Vote(proposedLeader,
                                            proposedZxid, logicalclock.get(), 
                                            proposedEpoch);
                                    leaveInstance(endVote);
                                    return endVote;
                                }
                            }
    protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
            SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
            voteSet.addQuorumVerifier(self.getQuorumVerifier());
            if (self.getLastSeenQuorumVerifier() != null
                    && self.getLastSeenQuorumVerifier().getVersion() > self
                            .getQuorumVerifier().getVersion()) {
                voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }
    
            /*
             * First make the views consistent. Sometimes peers will have different
             * zxids for a server depending on timing.
             */
            for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
                //注意这里,votes就是我反复强调的recvset,这里是判断我收到消息的投票,如果和我自己的投票一致,就加入到voteSet
                if (vote.equals(entry.getValue())) {
                    voteSet.addAck(entry.getKey());
                }
            }
            
            return voteSet.hasAllQuorums();
        }
    public boolean hasAllQuorums() {
            for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
                if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
                    return false;
            }
            return true;
        }

    QuorumMaj# containsQuorum

    public boolean containsQuorum(Set<Long> ackSet) {
            return (ackSet.size() > half);
        }

    其中half就是参与投票的服务器除2。比如三台机器那么half就是1.同时 (ackSet.size() > half) 这里是大于,也就是投票要大于等于2才满足条件。

    我们再回到第二部分,分析剩余部分

    // Verify if there is any change in the proposed leader
                                while((n = recvqueue.poll(finalizeWait,
                                        TimeUnit.MILLISECONDS)) != null){//这里是继续判断,即使满足了结束条件也得再看看是否又收到了新的消息,如果收到了就break,然后再次循环
                                    if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                            proposedLeader, proposedZxid, proposedEpoch)){
                                        recvqueue.put(n);
                                        break;
                                    }
                                }
    
                                /*
                                 * This predicate is true once we don't read any new
                                 * relevant message from the reception queue
                                 */
                                if (n == null) {//代码走到这里说明没有新的消息了,而且也满足了选主条件
                                    self.setPeerState((proposedLeader == self.getId()) ?
                                            ServerState.LEADING: learningState());//设置自己的角色是leader 还是follower
                                    Vote endVote = new Vote(proposedLeader,
                                            proposedZxid, logicalclock.get(), 
                                            proposedEpoch);
                                    leaveInstance(endVote);
                                    return endVote;
                                }

    看到这里,我们看到选出来主是每个zk服务端自动就会把自己的角色设置好,而不是选出来主,主会再发一次消息告诉大家我是主。

    当每个zk服务器中接收消息的队列为空的时候,就说明该发的消息都已经发完了。那么谁是主,就已经确定了

  • 相关阅读:
    POI2012 (持续更新中)
    [BZOJ2797][Poi2012]Squarks
    [BZOJ2796][Poi2012]Fibonacci Representation
    [BZOJ2791][Poi2012]Rendezvous
    [BZOJ2795][Poi2012]A Horrible Poem
    [BZOJ2794][Poi2012]Cloakroom
    纸张概率/期望题
    2016-5-11授课
    bzoj4519: [Cqoi2016]不同的最小割
    poj3693 Maximum repetition substring
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14897245.html
Copyright © 2011-2022 走看看