zoukankan      html  css  js  c++  java
  • Raft一致性协议的投票选举

    前言


    每当我们谈到对于分布式系统容错性的时候,我们其实真正想聊的是里面的状态一致性的控制。而当我们再想深入探讨一致性控制的时候,我们想到的可能是经典的Paxos协议。但是Paxos协议实现起来较为复杂,而且原理本身也不易理解。笔者在之前文章中聊过一个更易理解和实现的一致性协议Raft协议:聊聊Raft一致性协议以及Apache Ratis。在Raft协议中,引入了Leader、Follower以及Candidate三种角色来做一致性控制中的投票选举过程。本文笔者来聊聊里Raft协议中最为关键的投票选举过程以及它的一个实现版本。在开源项目Apache Ratis中,对此协议有完整的版本实现。

    Raft协议的投票选举原理


    在Raft协议的投票过程中,它是由1个Candidate候选者向其它Follower发送投票请求,让这些Follower投票选择Candidate。然后这些Follower将会返回给Candidate。如果这个Candidate收到超过半数以上的总票数的时候,那么此Candidate就可以当选为Leader身份了。

    投票过程如下图所示:
    在这里插入图片描述
    投票请求结果返回
    在这里插入图片描述
    上述投票选举过程在现实情况中其实还有很多种边缘情况,比如同时有另外一个Candidate在投票选举时怎么办?当发起此轮投票选举时,发现已经有新的投票Leader选举已被选出来了,怎么处理呢。

    下面我们来看看这些边缘情况Raft协议中是怎么处理的。

    Raft协议的投票选举细节及代码实现


    这里我们要从Leader选举的源头开始讲起。当系统启动好之后,初始选举后系统由1个Leader和若干个Follower角色组成。然后突然由于某个异常原因,Leader服务出现了异常,导致Follower角色检测到和Leader的上次RPC更新时间超过给定阈值时间时。此时Follower会认为Leader服务已出现异常,然后它将会发起一次新的Leader选举行为,同时将自身的状态从Follower切换为Candidate身份。随后请求其它Follower投票选择自己。

    这里笔者结合Apache Ratis中对于Raft的实现来展开阐述,上述超时时间代码如下:

      public  void run() {
        long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
        while (monitorRunning && server.isFollower()) {
          ...
            synchronized (server) {
              // 如果当前Follower检测到上次RPC时间超过规定阈值,则开始将自身切为候选者身份
              if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
                LOG.info("{}:{} changes to CANDIDATE, lastRpcTime:{}, electionTimeout:{}ms",
                    server.getId(), server.getGroupId(), lastRpcTime.elapsedTimeMs(), electionTimeout);
                // election timeout, should become a candidate
                server.changeToCandidate();
                break;
              }
            }
          ...
        }
      }
    

    然后切换为Candidate的服务发起新的领导选举,

      synchronized void changeToCandidate() {
        Preconditions.assertTrue(isFollower());
        role.shutdownFollowerState();
        setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
        if (state.shouldNotifyExtendedNoLeader()) {
          stateMachine.notifyExtendedNoLeader(getRoleInfoProto());
        }
        // 然后此Candidate发起新的领导选举
        role.startLeaderElection(this);
      }
    

    接着我们需将当前Leader选举的轮次更新,通俗地解释可理解为第几届选举了,这个值在Candidate每次发起选举时会递增。选举轮次编号信息是一个十分重要的信息,这可以避免那些消息落后的Candidate发起滞后的领导选举过程,而获取最新的Leader信息。

    private void askForVotes() throws InterruptedException, IOException {
        final ServerState state = server.getState();
        while (shouldRun()) {
          // one round of requestVotes
          final long electionTerm;
          final RaftConfiguration conf;
          synchronized (server) {
        	// (1).初始化当前选举轮次编号,比当前的轮次递增1
            electionTerm = state.initElection();
            conf = state.getRaftConf();
            state.persistMetadata();
          }
          ...
    
          final ResultAndTerm r;
          // (2).获取除自身外其他Follow服务的Service信息
          final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
          if (others.isEmpty()) {
            r = new ResultAndTerm(Result.PASSED, electionTerm);
          } else {
            final Executor voteExecutor = new Executor(this, others.size());
            try {
              // (3).发起投票过程
              final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor);
              // (4).等待投票结果返回
              r = waitForResults(electionTerm, submitted, conf, voteExecutor);
            } finally {
              voteExecutor.shutdown();
            }
          }
    

    下面我们来进入submitRequests的实际子过程,看看Follower在接收到投票请求时,是如何处理的。

      private RequestVoteReplyProto requestVote(
          RaftPeerId candidateId, RaftGroupId candidateGroupId,
          long candidateTerm, TermIndex candidateLastEntry) throws IOException {
        ...
        synchronized (this) {
          final FollowerState fs = role.getFollowerState().orElse(null);
          // (1)Follower判断发起的领导轮次编号是否落后于当前的轮次
          if (shouldWithholdVotes(candidateTerm)) {
            LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State: leader={}, term={}, lastRpcElapsed={}",
                getMemberId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(),
                fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null);
          } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
        	// (2)轮次编号,CandidateId有效,当前Follower投票给请求方的Candidate
            final boolean termUpdated = changeToFollower(candidateTerm, true, "recognizeCandidate:" + candidateId);
            // see Section 5.4.1 Election restriction
            if (state.isLogUpToDate(candidateLastEntry) && fs != null) {
              fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE);
              // (3)记下当前Follower投票的CandidateId,用来表明此Follower的投票归属
              // 避免发生二次投票
              state.grantVote(candidateId);
              voteGranted = true;
            }
            if (termUpdated || voteGranted) {
              state.persistMetadata(); // sync metafile
            }
          }
          ...
        }
        return reply;
      }
    

    这里我们着重再来看state.recognizeCandidate操作,它在Follower作出投票选举前做了哪些关键的验证操作呢。

      /**
       * 判断当前的轮次编号,给定的Candidate身份是否是可接受的
       */
      boolean recognizeCandidate(RaftPeerId candidateId, long candidateTerm) {
        if (!getRaftConf().containsInConf(candidateId)) {
          return false;
        }
        final long current = currentTerm.get();
        // 如果当前投票选举编号轮次是新的领导选举轮次,则是可接受的
        if (candidateTerm > current) {
          return true;
        } else if (candidateTerm == current) {
          // 如果不是,意为此选举轮次中,有多个Candidate发起了领导选择
          // 此时判断当前Follower是否已经投出过票:
          //   1)没有投出过票,Candidate可接受
          //   2) 投出过票,但是投出的票不是给定Candidate,则不接受
          return votedFor == null || votedFor.equals(candidateId);
        }
        return false;
      }
    

    从上面处理中,我们可以注意到这里面是会存在多个Candidate同时发生新轮次的领导选举过程中,这个时候就还得进行投票归属信息的判断,避免Follower投出进行二次投票过程。当然在这个过程中,我们也要保证投票相关变量的更新务必是原子更新的。

    随后Follower将投票结果返回给发起请求的Candidate,然后Candidate进行了以下的处理过程。

      private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
          RaftConfiguration conf, Executor voteExecutor) throws InterruptedException {
        final Timestamp timeout = Timestamp.currentTime().addTimeMs(server.getRandomTimeoutMs());
        final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
        final List<Exception> exceptions = new ArrayList<>();
        int waitForNum = submitted;
        Collection<RaftPeerId> votedPeers = new ArrayList<>();
        while (waitForNum > 0 && shouldRun(electionTerm)) {
         ...
    
          try {
        	// (1)从投票线程池中拿出投票结果
            final Future<RequestVoteReplyProto> future = voteExecutor.poll(waitTime);
            if (future == null) {
              continue; // poll timeout, continue to return Result.TIMEOUT
            }
    
            final RequestVoteReplyProto r = future.get();
            final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId());
            final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r);
            if (previous != null) {
              LOG.warn("{} received duplicated replies from {}, the 2nd reply is ignored: 1st = {}, 2nd = {}",
                  server.getId(), replierId, ServerProtoUtils.toString(previous), ServerProtoUtils.toString(r));
              continue;
            }
            if (r.getShouldShutdown()) {
              return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1);
            }
            // (2)如果发现当前Follower的领导选举已经是新的轮次的话,则返回新的轮次信息
            if (r.getTerm() > electionTerm) {
              return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
                  exceptions, r.getTerm());
            }
            // (3)接受到成功的返回结果,加入到投票结果列表中
            if (r.getServerReply().getSuccess()) {
              votedPeers.add(replierId);
              // 如果投票总数超过半数以上时,则表明当前领导选举通过
              if (conf.hasMajority(votedPeers, server.getId())) {
                return logAndReturn(Result.PASSED, responses, exceptions, -1);
              }
            }
          } catch(ExecutionException e) {
            LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting votes", e);
            exceptions.add(e);
          }
          waitForNum--;
        }
        // (4)规定时间内没有获取到足够多的票数,则当前领导选举竞选失败
        return logAndReturn(Result.REJECTED, responses, exceptions, -1);
      }
    

    Candidate对于上述最终结果的处理过程如下:

      private void askForVotes() throws InterruptedException, IOException {
        final ServerState state = server.getState();
        while (shouldRun()) {
          ...
            try {
              // (3).发起投票过程
              final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor);
              // (4).等待投票结果返回
              r = waitForResults(electionTerm, submitted, conf, voteExecutor);
            } finally {
              voteExecutor.shutdown();
            }
          }
    
          synchronized (server) {
            if (!shouldRun(electionTerm)) {
              return; // term already passed or this should not run anymore.
            }
     
            // (5)对选举结果的处理
            switch (r.result) {
              case PASSED:
            	// 选举通过,则切换当前身份为Leader
                server.changeToLeader();
                return;
              case SHUTDOWN:
                LOG.info("{} received shutdown response when requesting votes.", this);
                server.getProxy().close();
                return;
              // 如果选举失败或发现更新一轮的选举轮次(说明别的Candidate发起的领导选举已经成功)
              // 则进行相应信息更新
              case REJECTED:
              case DISCOVERED_A_NEW_TERM:
                final long term = Math.max(r.term, state.getCurrentTerm());
                server.changeToFollowerAndPersistMetadata(term, Result.DISCOVERED_A_NEW_TERM);
                return;
              case TIMEOUT:
                // should start another election
            }
          }
        }
      }
    

    以上就是Apache Ratis内部基于Raft协议的投票过程的代码实现过程。在这里,为了避免可能存在多个Candidate几乎同时发生投票,导致结果完全一致从而需要进行下一轮次的选举。这里会进行随机时间间隔的设置,来错开投票的发起时间。

    在Apache Ratis中,也有此细节的实现:

      public  void run() {
        long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
        while (monitorRunning && server.isFollower()) {
        // 随机时间的设置,避免完全同时投票选举过程发生
        final long electionTimeout = server.getRandomTimeoutMs();
          try {
            if (!JavaUtils.sleep(electionTimeout, sleepDeviationThresholdMs)) {
              continue;
            }
            ...
            synchronized (server) {
              if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
                // ...
                server.changeToCandidate();
                break;
              }
            }
          ...
        }
      }
    

    这个corner case是可能存在的,比如A, B, C, D四个服务,A和B要竞选Leader身份,当按照下述选举过程时,就会出现平票结果:

    1)每个Candidate会向非自身服务发起投票选举,但是如若自身也收到投票选举时,默认只选自己,不会投向其它Candidate
    2)Candidate A向B, C,D发起投票选举,B也是Candidate,它只会投给自己,同时C投票给A。接着D投给了B。这样就出现了平票2对2的情况了。

    当然上述情况主要多加1个服务,就不会出现平票的情况了,

    最后附上Raft投票选举过程图,大家可以对照上述的子过程实现,进行对比,学习。总体来讲,投票实现过程还是比较易于理解的。

    在这里插入图片描述

    引用


    [1].https://raft.github.io/raft.pdf
    [2].http://ratis.incubator.apache.org/
    [3].https://raft.github.io/
    [4].https://blog.csdn.net/Androidlushangderen/article/details/86763412

  • 相关阅读:
    MYSQL把一张表的数据批量复制到另外一张表
    06:rpm 和 yum 执行卡住,解决方法
    su: cannot set user id: Resource temporarily unavailable 解决方法
    queue队列
    threading 两种调用方法
    paramiko 模块 linux
    socketserver 模块实现ftp功能
    socket实现简单的文件下载传输功能
    mysql使用update代替delete做伪删除
    mysql客户端命令mysqladmin介绍
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183523.html
Copyright © 2011-2022 走看看