zoukankan      html  css  js  c++  java
  • zookeeper leader选举算法源码

    服务器状态

    在QuorumPeer中有定义,这个类是一个线程。

    1. LOOKING:寻找Leader状态。处于该状态时,它会认为当前集群中没有Leader,进入选举流程。
    2. FOLLOWING:
    3. LEADING
    4. OBSERVING

    选票数据结构

    public class Vote {
        //
        final private int version;
        //被选举leader的服务器ID
        final private long id;
        //被选举leader的事务ID
        final private long zxid;
        //逻辑时钟,判断多个选票是否处于同一个选举周期,
        final private long electionEpoch;
        //被推举leader的选举轮次
        final private long peerEpoch;
        //状态
        final private ServerState state;
    

    QuorumCnxManager:网络IO

    负责选举leader时的网络通信

    消息队列

    SendWork和RevWork都是一个线程

        /*
         * 分别是发送器,发送队列,最后发送的消息。每个连接都有
         */
        final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;//SendWork里面有RevWork对象
        final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
        final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
        
        /*
         * 接受队列只有一个
         */
        public final ArrayBlockingQueue<Message> recvQueue;
    

    建立连接

    zookeeper为Leader选举会建立一条连接,默认端口是3888。为了防止两台服务器有重复链接,zookeeper定义了规则,只能sid大的去连接sid小的。如果sid小的连接了sid大的,在连接处理程序中会断掉这条连接,然后重新发起连接。

    main->receiveConnection->handleConnection(创建sendwork和revwork,并且加入队列集)

    消息的接收和发送

    • 消息的接收过程是由消息接收器recvwork负责,它源源不断从TCP读取数据,加入recvQueue(唯一)。

    • 消息发送器主要有两条逻辑

      • 启动sendWork线程后如果发现发送队列是null,从lastMessageSent获取这条数据重新发送。(为了解决由于收到消息前后服务器挂掉,导致消息未正确处理)
      • sendWork从队列queueSendMap里面获取数据,通过调用队列的poll函数从队列获取数据

    FastLeaderElection

    这是选举选法的核心部分,主要在FastLeaderElection中

    选票管理

    public class FastLeaderElection implements Election{
        //发送队列,用于保存待发送的选票
        LinkedBlockingQueue<ToSend> sendqueue;
        //接收队列,用于保存接收的外部选票
        LinkedBlockingQueue<Notification> recvqueue;
        //选票发送器和接收器线程
        Messenger messenger;
        
        protected class Messenger {
            //选票接收器线程,接受选票,如果当前状态不为locking,将leader信息发回
            class WorkerReceiver extends ZooKeeperThread{}
            //选票发送器线程,发送选票。
            //负责把选票转化为消息,放入QuorumCnxManager的发送队列,
            //如果是投给自己的,直接放入接收队列
            class WorkerSender extends ZooKeeperThread {}
        }
    }
    
    

    核心算法——lookForLeader

    • 调用流程:QuorumPeer->locking状态(可以启动只读模式和阻塞模式)->lookForLeader
    public Vote lookForLeader() throws InterruptedException {
        //...
        try {
            //用于选票归档
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
    
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
    
            int notTimeout = finalizeWait;
    
            synchronized(this){
                //自增logicalclock,
                logicalclock++;
                //初始化选票,投给自己,使用lastProcessedZxid(最后已提交的日志投票)
                updateProposal(getInitId(),getInitLastLoggedZxid(),
                                getPeerEpoch());
            }
            
            //初始化选票,然后WorkerSender发送
            sendNotifications();
    
            /*
             * Loop in which we exchange notifications until we find a leader
             */
            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);
    
                //没有获得外部选票
                if(n == null){
                    //如果连接仍然保持,重新发送投票
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                    //连接失效,重新建立连接。开始的时候是这样建立连接的?
                        manager.connectAll();
                    }
                    //修改超时参数...
                }
                //处理选票
                else if(self.getVotingView().containsKey(n.sid)) {
                    switch (n.state) {
                    case LOOKING:
                        // 大于当前选举轮次
                        if (n.electionEpoch > logicalclock) {
                            logicalclock = n.electionEpoch;
                            //清空接受的选票
                            recvset.clear();
                            //选票PK,外部更新。有3条规则
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                //变更选票
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                //不变更选票
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();
                        } 
                        // 小于当前选举轮次,直接丢弃
                        else if (n.electionEpoch < logicalclock) {
                            break;
                        } 
                        //等于当前选举轮次,直接PK
                        else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }
                        
                        //无论是否重新投票,都要选票归档,<sid, 选票>
                        //都是和自己的提议对比
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
    
                        //统计投票,决定是否终止投票
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock, proposedEpoch))) {
    
                            // 判断leader是否改变
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }
    
                            if (n == null) {
                                //设置状态,如果leader是自己,状态为Leading
                                //如果leader是其他节点,状态可能为observing或者following
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
    
                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock,
                                                        proposedEpoch);
                                //清空接收队列
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        break;
                    //已经选出结果
                    case FOLLOWING:
                    case LEADING:
                        //除了做出过半判断,同时还要检查leader是否给自己发送过投票信息,从投票信息中确认该leader是不是LEADING状态(防止出现时间差)。
                        
                        /* 同一轮投票选出leader,那么判断是不是半数以上的服务器都选举同一个leader,如果是设置角色并退出选举 */
                        if(n.electionEpoch == logicalclock){
                            recvset.put(n.sid, new Vote(n.leader,
                                                          n.zxid,
                                                          n.electionEpoch,
                                                          n.peerEpoch));
                           
                            if(ooePredicate(recvset, outofelection, n)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
    
                                Vote endVote = new Vote(n.leader, 
                                        n.zxid, 
                                        n.electionEpoch, 
                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
    
                        /* 非同一轮次,例如宕机很久的机器重新启动/某个节点延迟很大变为locking,需要收集过半选票。*/
                        outofelection.put(n.sid, new Vote(n.version,
                                                            n.leader,
                                                            n.zxid,
                                                            n.electionEpoch,
                                                            n.peerEpoch,
                                                            n.state));
           
                        if(ooePredicate(outofelection, outofelection, n)) {
                            synchronized(this){
                                logicalclock = n.electionEpoch;
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader,
                                                    n.zxid,
                                                    n.electionEpoch,
                                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        break;
                    }
                } else {
                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
                }
            }
            return null;
        } 
    }
    
    
    
    • 初始选票
      • (sid, LastLoggedZxid, currentEpoch)
      • LastLoggedZxid为处理(包括提交,未提交)
    • 接收到新的选票后,从以下几个层次判断
      • 选票状态
      • 选票轮次
      • 选票变更规则
    • 变更选票的3条规则
      • New epoch更高
      • epoch相同,选择zxid更高的
      • 前面的都相同,选择sid更高的

    模块图总结

    image

  • 相关阅读:
    【Uvalive4960】 Sensor network (苗条树,进化版)
    【UVA 1151】 Buy or Build (有某些特别的东东的最小生成树)
    【UVA 1395】 Slim Span (苗条树)
    【UVA 10600】 ACM Contest and Blackout(最小生成树和次小生成树)
    【UVA 10369】 Arctic Network (最小生成树)
    【UVA 10816】 Travel in Desert (最小瓶颈树+最短路)
    【UVA 11183】 Teen Girl Squad (定根MDST)
    【UVA 11865】 Stream My Contest (二分+MDST最小树形图)
    【UVA 11354】 Bond (最小瓶颈生成树、树上倍增)
    【LA 5713 】 Qin Shi Huang's National Road System (MST)
  • 原文地址:https://www.cnblogs.com/biterror/p/7147444.html
Copyright © 2011-2022 走看看