zoukankan      html  css  js  c++  java
  • FastLeaderElection

    FastLeaderElection是zookeeper默认的选举算法,当peer处于ServerState.Looking状态时会执行FastLeaderElection.lookForLeader进行选主.

    重要数据结构:

      1.HashMap<Long, Vote> recvset: 本轮选举中来自 ServerState处于 Looking的 Peer的选票信息.   用于判断是否选举结束.

      2.HashMap<Long, Vote> outofelection : 选举之外的 peer发送的选票信息, 即  ServerState处于 Following和Leading的peer发送的信息 表示选举已经结束了.  用于判断选举是否结束.

    重要函数:

      

    totalOrderPredicate: 比较zxid的大小, 比较顺序   epoch -> zxid - > serviceId
    termPredicate : 通过判断 Leader是否在 recvSet中占1/2以上来确定是否结束了选举
    ooePredicate : 通过recvSet和outofelection判断是否结束了选举.

    选主主要逻辑如下:

      1.更新逻辑时钟+1,向其他peer发送选自己的提议

      2.循环处理来自其他Peer的通知:

        1) Looking的通知:  如果通知中推荐的人比自己合适,则更新提议发送给其他peer,否则忽略.    判断选举是否结束, 通过判断 notification.leader 是否占 recvset的 1/2以上选票.

        2)Leading或Following的通知: 如果收到这两种消息说明选举已经结束, 通过outofelection集合判断.

    public Vote lookForLeader() throws InterruptedException {
           ......try {
                HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
    
                HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
    
                int notTimeout = finalizeWait;
    
                synchronized(this){
                    logicalclock++;
                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                }
    
                LOG.info("New election. My id =  " + self.getId() +
                        ", proposed zxid=0x" + Long.toHexString(proposedZxid));
                sendNotifications();
    
                /*
                 * Loop in which we exchange notifications until we find a leader
                 */
    
                while ((self.getPeerState() == ServerState.LOOKING) &&
                        (!stop)){
                    /*
                     * Remove next notification from queue, times out after 2 times
                     * the termination time
                     */
                    Notification n = recvqueue.poll(notTimeout,
                            TimeUnit.MILLISECONDS);
    
                    /*
                     * Sends more notifications if haven't received enough.
                     * Otherwise processes new notification.
                     */
                    if(n == null){
                        if(manager.haveDelivered()){
                            sendNotifications();
                        } else {
                            manager.connectAll();
                        }
    
                        /*
                         * Exponential backoff
                         */
                        int tmpTimeOut = notTimeout*2;
                        notTimeout = (tmpTimeOut < maxNotificationInterval?
                                tmpTimeOut : maxNotificationInterval);
                        LOG.info("Notification time out: " + notTimeout);
                    }
                    else if(self.getVotingView().containsKey(n.sid)) {
                        /*
                         * Only proceed if the vote comes from a replica in the
                         * voting view.
                         */

                //处理通知逻辑
    switch (n.state) { case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock) { logicalclock = n.electionEpoch; recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock)); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); }                 
                  //更新recvSet
    recvset.put(n.sid,
    new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
                        //半路杀出个程咬金
    if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } }                 //如果n不为空, 说明出现了比 自己推荐的人更适合当leader的peer出现了 /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */
                  //逻辑时钟相同说明处于同一轮选举,需要更新recvSet后进行判断
    if(n.electionEpoch == logicalclock){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } }               
                  //新peer加入集群时需要判断一下是不是当前大多数的peer都follow这个Leader了,recvSet必然为空,所以需要更新ooe来判断是否结束了选举
    /* * Before joining an established ensemble, verify * a majority is following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock = n.electionEpoch; self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { LOG.warn("Ignoring notification from non-cluster member " + n.sid); } } return null; } finally { try { if(self.jmxLeaderElectionBean != null){ MBeanRegistry.getInstance().unregister( self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; } }
  • 相关阅读:
    聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四)
    java.lang.NoSuchMethodException: org.apache.catalina.deploy.WebXml addServlet
    理解Linux系统中的load average
    启动loadrunner 11的controller提示试图执行系统不支持的操作(已解决)
    linux系统瓶颈分析(精)
    常用的linux系统监控命令
    Loadrunner中Throughput(吞吐量)的分析与计算
    Lr_debug_message,Lr_output_message,Lr_error_message,Lrd_stmt,Lrd_fetch
    Loadrunner日志设置与查看
    LoadRunner中进程运行和线程运行区别
  • 原文地址:https://www.cnblogs.com/ironroot/p/7403846.html
Copyright © 2011-2022 走看看