zoukankan      html  css  js  c++  java
  • zookeeper leader选举算法源码

    服务器状态

    在QuorumPeer中有定义,这个类是一个线程。

    1. LOOKING:寻找Leader状态。处于该状态时,它会认为当前集群中没有Leader,进入选举流程。
    2. FOLLOWING:
    3. LEADING
    4. OBSERVING

    选票数据结构

    public class Vote {
        //
        final private int version;
        //被选举leader的服务器ID
        final private long id;
        //被选举leader的事务ID
        final private long zxid;
        //逻辑时钟,判断多个选票是否处于同一个选举周期,
        final private long electionEpoch;
        //被推举leader的选举轮次
        final private long peerEpoch;
        //状态
        final private ServerState state;
    

    QuorumCnxManager:网络IO

    负责选举leader时的网络通信

    消息队列

    SendWork和RevWork都是一个线程

        /*
         * 分别是发送器,发送队列,最后发送的消息。每个连接都有
         */
        final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;//SendWork里面有RevWork对象
        final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
        final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
        
        /*
         * 接受队列只有一个
         */
        public final ArrayBlockingQueue<Message> recvQueue;
    

    建立连接

    zookeeper为Leader选举会建立一条连接,默认端口是3888。为了防止两台服务器有重复链接,zookeeper定义了规则,只能sid大的去连接sid小的。如果sid小的连接了sid大的,在连接处理程序中会断掉这条连接,然后重新发起连接。

    main->receiveConnection->handleConnection(创建sendwork和revwork,并且加入队列集)

    消息的接收和发送

    • 消息的接收过程是由消息接收器recvwork负责,它源源不断从TCP读取数据,加入recvQueue(唯一)。

    • 消息发送器主要有两条逻辑

      • 启动sendWork线程后如果发现发送队列是null,从lastMessageSent获取这条数据重新发送。(为了解决由于收到消息前后服务器挂掉,导致消息未正确处理)
      • sendWork从队列queueSendMap里面获取数据,通过调用队列的poll函数从队列获取数据

    FastLeaderElection

    这是选举选法的核心部分,主要在FastLeaderElection中

    选票管理

    public class FastLeaderElection implements Election{
        //发送队列,用于保存待发送的选票
        LinkedBlockingQueue<ToSend> sendqueue;
        //接收队列,用于保存接收的外部选票
        LinkedBlockingQueue<Notification> recvqueue;
        //选票发送器和接收器线程
        Messenger messenger;
        
        protected class Messenger {
            //选票接收器线程,接受选票,如果当前状态不为locking,将leader信息发回
            class WorkerReceiver extends ZooKeeperThread{}
            //选票发送器线程,发送选票。
            //负责把选票转化为消息,放入QuorumCnxManager的发送队列,
            //如果是投给自己的,直接放入接收队列
            class WorkerSender extends ZooKeeperThread {}
        }
    }
    
    

    核心算法——lookForLeader

    • 调用流程:QuorumPeer->locking状态(可以启动只读模式和阻塞模式)->lookForLeader
    public Vote lookForLeader() throws InterruptedException {
        //...
        try {
            //用于选票归档
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
    
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
    
            int notTimeout = finalizeWait;
    
            synchronized(this){
                //自增logicalclock,
                logicalclock++;
                //初始化选票,投给自己,使用lastProcessedZxid(最后已提交的日志投票)
                updateProposal(getInitId(),getInitLastLoggedZxid(),
                                getPeerEpoch());
            }
            
            //初始化选票,然后WorkerSender发送
            sendNotifications();
    
            /*
             * Loop in which we exchange notifications until we find a leader
             */
            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);
    
                //没有获得外部选票
                if(n == null){
                    //如果连接仍然保持,重新发送投票
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                    //连接失效,重新建立连接。开始的时候是这样建立连接的?
                        manager.connectAll();
                    }
                    //修改超时参数...
                }
                //处理选票
                else if(self.getVotingView().containsKey(n.sid)) {
                    switch (n.state) {
                    case LOOKING:
                        // 大于当前选举轮次
                        if (n.electionEpoch > logicalclock) {
                            logicalclock = n.electionEpoch;
                            //清空接受的选票
                            recvset.clear();
                            //选票PK,外部更新。有3条规则
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                //变更选票
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                //不变更选票
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();
                        } 
                        // 小于当前选举轮次,直接丢弃
                        else if (n.electionEpoch < logicalclock) {
                            break;
                        } 
                        //等于当前选举轮次,直接PK
                        else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }
                        
                        //无论是否重新投票,都要选票归档,<sid, 选票>
                        //都是和自己的提议对比
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
    
                        //统计投票,决定是否终止投票
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock, proposedEpoch))) {
    
                            // 判断leader是否改变
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }
    
                            if (n == null) {
                                //设置状态,如果leader是自己,状态为Leading
                                //如果leader是其他节点,状态可能为observing或者following
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
    
                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock,
                                                        proposedEpoch);
                                //清空接收队列
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        break;
                    //已经选出结果
                    case FOLLOWING:
                    case LEADING:
                        //除了做出过半判断,同时还要检查leader是否给自己发送过投票信息,从投票信息中确认该leader是不是LEADING状态(防止出现时间差)。
                        
                        /* 同一轮投票选出leader,那么判断是不是半数以上的服务器都选举同一个leader,如果是设置角色并退出选举 */
                        if(n.electionEpoch == logicalclock){
                            recvset.put(n.sid, new Vote(n.leader,
                                                          n.zxid,
                                                          n.electionEpoch,
                                                          n.peerEpoch));
                           
                            if(ooePredicate(recvset, outofelection, n)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
    
                                Vote endVote = new Vote(n.leader, 
                                        n.zxid, 
                                        n.electionEpoch, 
                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
    
                        /* 非同一轮次,例如宕机很久的机器重新启动/某个节点延迟很大变为locking,需要收集过半选票。*/
                        outofelection.put(n.sid, new Vote(n.version,
                                                            n.leader,
                                                            n.zxid,
                                                            n.electionEpoch,
                                                            n.peerEpoch,
                                                            n.state));
           
                        if(ooePredicate(outofelection, outofelection, n)) {
                            synchronized(this){
                                logicalclock = n.electionEpoch;
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader,
                                                    n.zxid,
                                                    n.electionEpoch,
                                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        break;
                    }
                } else {
                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
                }
            }
            return null;
        } 
    }
    
    
    
    • 初始选票
      • (sid, LastLoggedZxid, currentEpoch)
      • LastLoggedZxid为处理(包括提交,未提交)
    • 接收到新的选票后,从以下几个层次判断
      • 选票状态
      • 选票轮次
      • 选票变更规则
    • 变更选票的3条规则
      • New epoch更高
      • epoch相同,选择zxid更高的
      • 前面的都相同,选择sid更高的

    模块图总结

    image

  • 相关阅读:
    NOI2015 寿司晚宴
    bzoj3456 城市规划
    DDP入门
    HAOI2018 染色
    曹冲养猪
    采药
    跳跳棋
    基础复习笔记-最短路

    康熙环球
  • 原文地址:https://www.cnblogs.com/biterror/p/7147444.html
Copyright © 2011-2022 走看看