zoukankan      html  css  js  c++  java
  • Copycat

    Member.Status

    status的变迁是源于heartbeat

    heartbeat,append空的entries

    /**
       * Triggers a heartbeat to a majority of the cluster.
       * <p>
       * For followers to which no AppendRequest is currently being sent, a new empty AppendRequest will be
       * created and sent. For followers to which an AppendRequest is already being sent, the appendEntries()
       * call will piggyback on the *next* AppendRequest. Thus, multiple calls to this method will only ever
       * result in a single AppendRequest to each follower at any given time, and the returned future will be
       * shared by all concurrent calls.
       *
       * @return A completable future to be completed the next time a heartbeat is received by a majority of the cluster.
       */
      public CompletableFuture<Long> appendEntries() {
        // If there are no other active members in the cluster, simply complete the append operation.
        if (context.getClusterState().getRemoteMemberStates().isEmpty())
          return CompletableFuture.completedFuture(null);
    
        // If no heartbeat future already exists, that indicates there's no heartbeat currently under way.
        // Create a new heartbeat future and commit to all members in the cluster.
        if (heartbeatFuture == null) {
          CompletableFuture<Long> newHeartbeatFuture = new CompletableFuture<>();
          heartbeatFuture = newHeartbeatFuture;
          heartbeatTime = System.currentTimeMillis();
          for (MemberState member : context.getClusterState().getRemoteMemberStates()) {
            appendEntries(member); // 对所有member发起appendEntries
          }
          return newHeartbeatFuture;
        }

    heartbeat的逻辑是会向所有的getRemoteMemberStates,发起heartbeat

    AVAILABLE

    在初始化的时候,每个ServerMember默认是Status.AVAILABLE

    public final class ServerMember implements Member, CatalystSerializable, AutoCloseable {
      private Member.Type type;
      private Status status = Status.AVAILABLE;

    LeaderAppender

    @Override
      protected void succeedAttempt(MemberState member) {
        super.succeedAttempt(member);
    
        // If the member is currently marked as UNAVAILABLE, change its status to AVAILABLE and update the configuration.
        if (member.getMember().status() == ServerMember.Status.UNAVAILABLE && !leader.configuring()) {
          member.getMember().update(ServerMember.Status.AVAILABLE, Instant.now());
          leader.configure(context.getCluster().members());
        }
      }

    在succeedAttempt里面会将unavailable转换成available;在super.succeedAttempt中会将fail count清空

    这个当收到AppendResponseOk的时候会调用,

    protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {
        // Reset the member failure count and update the member's availability status if necessary.
        succeedAttempt(member);

    leader的心跳是通过空AppendResponse实现的,所以可以收到ResponseOK,说明member是available的

    UNAVAILABLE

    在fail Attempt中被调用

    @Override
      protected void failAttempt(MemberState member, Throwable error) {
        super.failAttempt(member, error);
    
        // Verify that the leader has contacted a majority of the cluster within the last two election timeouts.
        // If the leader is not able to contact a majority of the cluster within two election timeouts, assume
        // that a partition occurred and transition back to the FOLLOWER state.
        if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) {
          LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address());
          context.setLeader(0);
          context.transition(CopycatServer.State.FOLLOWER);
        }
        // If the number of failures has increased above 3 and the member hasn't been marked as UNAVAILABLE, do so.
        else if (member.getFailureCount() >= 3) {
          // If the member is currently marked as AVAILABLE, change its status to UNAVAILABLE and update the configuration.
          if (member.getMember().status() == ServerMember.Status.AVAILABLE && !leader.configuring()) {
            member.getMember().update(ServerMember.Status.UNAVAILABLE, Instant.now());
            leader.configure(context.getCluster().members());
          }
        }
      }

    super.failAttempt中,会重置connection,和increase failcount

    member.incrementFailureCount();

    第一个判断Math.max(heartbeatTime(), leaderTime)

    heartbeatTime

    /**
       * Returns the last time a majority of the cluster was contacted.
       * <p>
       * This is calculated by sorting the list of active members and getting the last time the majority of
       * the cluster was contacted based on the index of a majority of the members. So, in a list of 3 ACTIVE
       * members, index 1 (the second member) will be used to determine the commit time in a sorted members list.
       */
      private long heartbeatTime() {
        int quorumIndex = quorumIndex();
        if (quorumIndex >= 0) {
          return context.getClusterState().getActiveMemberStates((m1, m2)-> Long.compare(m2.getHeartbeatTime(), m1.getHeartbeatTime())).get(quorumIndex).getHeartbeatTime();
        }
        return System.currentTimeMillis();
      }

    这个意思将ActiveMember按heartbeat排序,然后取出quorumIndex的heartbeat,即多数派中最早的heartbeat
    如果leader收到的有效heartbeat达不到多数派,说明发生脑裂

    这时,leader会退化成follower

    第二个判断,当一个member的failcount>3,就把他标记为UNAVAILABLE

    而failAttempt,会在各种fail response里面被调用

    AbstractAppender
    handleAppendRequestFailure,
    handleAppendResponseFailure,
    handleConfigureRequestFailure,
    handleInstallRequestFailure

    CopycatServer.State

    public enum State {
    
        /**
         * Represents the state of an inactive server.
         * <p>
         * All servers start in this state and return to this state when {@link #leave() stopped}.
         */
        INACTIVE,
    
        /**
         * Represents the state of a server that is a reserve member of the cluster.
         * <p>
         * Reserve servers only receive notification of leader, term, and configuration changes.
         */
        RESERVE,
    
        /**
         * Represents the state of a server in the process of catching up its log.
         * <p>
         * Upon successfully joining an existing cluster, the server will transition to the passive state and remain there
         * until the leader determines that the server has caught up enough to be promoted to a full member.
         */
        PASSIVE,
    
        /**
         * Represents the state of a server participating in normal log replication.
         * <p>
         * The follower state is a standard Raft state in which the server receives replicated log entries from the leader.
         */
        FOLLOWER,
    
        /**
         * Represents the state of a server attempting to become the leader.
         * <p>
         * When a server in the follower state fails to receive communication from a valid leader for some time period,
         * the follower will transition to the candidate state. During this period, the candidate requests votes from
         * each of the other servers in the cluster. If the candidate wins the election by receiving votes from a majority
         * of the cluster, it will transition to the leader state.
         */
        CANDIDATE,
    
        /**
         * Represents the state of a server which is actively coordinating and replicating logs with other servers.
         * <p>
         * Leaders are responsible for handling and replicating writes from clients. Note that more than one leader can
         * exist at any given time, but Raft guarantees that no two leaders will exist for the same {@link Cluster#term()}.
         */
        LEADER
    
      }

    在serverContext初始化的时候,state为Inactive

    public class ServerContext implements AutoCloseable {
      //......
      protected ServerState state = new InactiveState(this);

    比较tricky的是,在Member里面有,

    enum Type {
    
        /**
         * Represents an inactive member.
         * <p>
         * The {@code INACTIVE} member type represents a member which does not participate in any communication
         * and is not an active member of the cluster. This is typically the state of a member prior to joining
         * or after leaving a cluster.
         */
        INACTIVE,
    
        /**
         * Represents a member which does not participate in replication.
         * <p>
         * The {@code RESERVE} member type is representative of a member that does not participate in any
         * replication of state but only maintains contact with the cluster leader and is an active member
         * of the {@link Cluster}. Typically, reserve members act as standby nodes which can be
         * {@link #promote() promoted} to a {@link #PASSIVE} or {@link #ACTIVE} role when needed.
         */
        RESERVE,
    
        /**
         * Represents a member which participates in asynchronous replication but does not vote in elections
         * or otherwise participate in the Raft consensus algorithm.
         * <p>
         * The {@code PASSIVE} member type is representative of a member that receives state changes from
         * follower nodes asynchronously. As state changes are committed via the {@link #ACTIVE} Raft nodes,
         * committed state changes are asynchronously replicated by followers to passive members. This allows
         * passive members to maintain nearly up-to-date state with minimal impact on the performance of the
         * Raft algorithm itself, and allows passive members to be quickly promoted to {@link #ACTIVE} voting
         * members if necessary.
         */
        PASSIVE,
    
        /**
         * Represents a full voting member of the Raft cluster which participates fully in leader election
         * and replication algorithms.
         * <p>
         * The {@code ACTIVE} member type represents a full voting member of the Raft cluster. Active members
         * participate in the Raft leader election and replication algorithms and can themselves be elected
         * leaders.
         */
        ACTIVE,
    
      }

    看看不同,这里面有Active,而State里面没有

    除此state包含type;

    意思是,memeber可以是inactive,reserve,passive和active

    当member是inactive,reserve,passive时,那么server的state也和其相应

    当member是active时,那么server的state,可能是follower,candidator或leader其中之一

    在CopycatServer.builder中,

    public static class Builder implements io.atomix.catalyst.util.Builder<CopycatServer> {
      //......
      private Member.Type type = Member.Type.ACTIVE;

    而注意,transition是根据Member.type,来transition state的

    /**
       * Transitions the server to the base state for the given member type.
       */
      protected void transition(Member.Type type) {
        switch (type) {
          case ACTIVE:
            if (!(state instanceof ActiveState)) {
              transition(CopycatServer.State.FOLLOWER);
            }
            break;
          case PASSIVE:
            if (this.state.type() != CopycatServer.State.PASSIVE) {
              transition(CopycatServer.State.PASSIVE);
            }
            break;
          case RESERVE:
            if (this.state.type() != CopycatServer.State.RESERVE) {
              transition(CopycatServer.State.RESERVE);
            }
            break;
          default:
            if (this.state.type() != CopycatServer.State.INACTIVE) {
              transition(CopycatServer.State.INACTIVE);
            }
            break;
        }
      }

    注意Active的处理,

    当Member.type为active,如果这个时候state不是ActiveState,就transition到follower;显然candidator和leader不是能直接transition过去的

    可以看到上面ServerContext在初始化的时候,state的初始状态是inactive
    何时会变成active,

    在server bootstrap或join一个cluster时, 都会调用ClusterState.join,里面会做状态的transition

    @Override
      public CompletableFuture<Void> bootstrap(Collection<Address> cluster) {
    
        if (configuration == null) {
          if (member.type() != Member.Type.ACTIVE) {
            return Futures.exceptionalFuture(new IllegalStateException("only ACTIVE members can bootstrap the cluster"));
          } else {
            // Create a set of active members.
            Set<Member> activeMembers = cluster.stream()
              .filter(m -> !m.equals(member.serverAddress()))
              .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated()))
              .collect(Collectors.toSet());
    
            // Add the local member to the set of active members.
            activeMembers.add(member);
    
            // Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration.
            configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers));
          }
        }
        return join();
      }
    @Override
      public synchronized CompletableFuture<Void> join(Collection<Address> cluster) {
    
        // If no configuration was loaded from disk, create a new configuration.
        if (configuration == null) {
          // Create a set of cluster members, excluding the local member which is joining a cluster.
          Set<Member> activeMembers = cluster.stream()
            .filter(m -> !m.equals(member.serverAddress()))
            .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated()))
            .collect(Collectors.toSet());
    
          // Create a new configuration and configure the cluster. Once the cluster is configured, the configuration
          // will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.
          configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); //修改配置
        }
        return join();
      }
    
      /**
       * Starts the join to the cluster.
       */
      private synchronized CompletableFuture<Void> join() {
        joinFuture = new CompletableFuture<>();
    
        context.getThreadContext().executor().execute(() -> {
          // Transition the server to the appropriate state for the local member type.
          context.transition(member.type()); //transition state
    
          // Attempt to join the cluster. If the local member is ACTIVE then failing to join the cluster
          // will result in the member attempting to get elected. This allows initial clusters to form.
          List<MemberState> activeMembers = getActiveMemberStates();
          if (!activeMembers.isEmpty()) {
            join(getActiveMemberStates().iterator());
          } else {
            joinFuture.complete(null);
          }
        });

    下面看看leader,candidator和follower之间的转化条件,

    Leader

    只有当Candidator发起vote,得到majority同意时,

    context.transition(CopycatServer.State.LEADER)
    /**
       * Resets the election timer.
       */
      private void sendVoteRequests() {
        //.........
        // Send vote requests to all nodes. The vote request that is sent
        // to this node will be automatically successful.
        // First check if the quorum is null. If the quorum isn't null then that
        // indicates that another vote is already going on.
        final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
          complete.set(true);
          if (elected) {
            context.transition(CopycatServer.State.LEADER); //checkComplete()调用
          } else {
            context.transition(CopycatServer.State.FOLLOWER);
          }
        });
    
        // Once we got the last log term, iterate through each current member
        // of the cluster and vote each member for a vote.
        for (ServerMember member : votingMembers) {
          LOGGER.debug("{} - Requesting vote from {} for term {}", context.getCluster().member().address(), member, context.getTerm());
          VoteRequest request = VoteRequest.builder()
            .withTerm(context.getTerm())
            .withCandidate(context.getCluster().member().id())
            .withLogIndex(lastIndex)
            .withLogTerm(lastTerm)
            .build();
    
          context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> {
            connection.<VoteRequest, VoteResponse>send(request).whenCompleteAsync((response, error) -> {
              context.checkThread();
              if (isOpen() && !complete.get()) {
                if (error != null) {
                  LOGGER.warn(error.getMessage());
                  quorum.fail();
                } else {
                    //........
                  } else {
                    LOGGER.debug("{} - Received successful vote from {}", context.getCluster().member().address(), member);
                    quorum.succeed(); //member同意,succeeded++;checkComplete();
                  }
                }
              }
            }, context.getThreadContext().executor());
          });

    Candidator

    只有当Follower发起Poll请求,并得到majority的同意后,

      /**
       * Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state.
       */
      private void sendPollRequests() {
       final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
          // If a majority of the cluster indicated they would vote for us then transition to candidate.
          complete.set(true);
          if (elected) {
            context.transition(CopycatServer.State.CANDIDATE);
          } else {
            resetHeartbeatTimeout();
          }
        });
        
        //......

    Follower

    Leader –> Follower

    在LeaderAppender中,由于heartbeat触发

    /**
       * Handles a {@link Response.Status#OK} response.
       */
      protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {
        //......
        // If we've received a greater term, update the term and transition back to follower.
        else if (response.term() > context.getTerm()) {
          context.setTerm(response.term()).setLeader(0);
          context.transition(CopycatServer.State.FOLLOWER);
        }

    如果收到Response OK,但是response的term大于我的term,说明我已经不是leader了
    所以要退化成follower

    /**
       * Handles a {@link Response.Status#ERROR} response.
       */
      protected void handleAppendResponseError(MemberState member, AppendRequest request, AppendResponse response) {
        // If we've received a greater term, update the term and transition back to follower.
        if (response.term() > context.getTerm()) {
          context.setTerm(response.term()).setLeader(0);
          context.transition(CopycatServer.State.FOLLOWER);

    对于ResponseError也一样

    @Override
      protected void failAttempt(MemberState member, Throwable error) {
        super.failAttempt(member, error);
    
        // Verify that the leader has contacted a majority of the cluster within the last two election timeouts.
        // If the leader is not able to contact a majority of the cluster within two election timeouts, assume
        // that a partition occurred and transition back to the FOLLOWER state.
        if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) {
          LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address());
          context.setLeader(0);
          context.transition(CopycatServer.State.FOLLOWER);
        }

    failAttemp时,两个getElectionTimeout超时内,收不到majority的heartbeat,说明发生partition
    退化成follower

    在LeaderState中,

    leader初始化失败时,

    /**
       * Commits a no-op entry to the log, ensuring any entries from a previous term are committed.
       */
      private CompletableFuture<Void> commitInitialEntries() {
        // The Raft protocol dictates that leaders cannot commit entries from previous terms until
        // at least one entry from their current term has been stored on a majority of servers. Thus,
        // we force entries to be appended up to the leader's no-op entry. The LeaderAppender will ensure
        // that the commitIndex is not increased until the no-op entry (appender.index()) is committed.
        CompletableFuture<Void> future = new CompletableFuture<>();
        appender.appendEntries(appender.index()).whenComplete((resultIndex, error) -> {
          context.checkThread();
          if (isOpen()) {
            if (error == null) {
              context.getStateMachine().apply(resultIndex);
              future.complete(null);
            } else {
              context.setLeader(0);
              context.transition(CopycatServer.State.FOLLOWER);
            }
          }
        });
        return future;
      }

    也会退化为follower

    Candidator –> Follower

    Vote失败时,退化为follower

    /**
       * Resets the election timer.
       */
      private void sendVoteRequests() {
        //......
        // Send vote requests to all nodes. The vote request that is sent
        // to this node will be automatically successful.
        // First check if the quorum is null. If the quorum isn't null then that
        // indicates that another vote is already going on.
        final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
          complete.set(true);
          if (elected) {
            context.transition(CopycatServer.State.LEADER);
          } else {
            context.transition(CopycatServer.State.FOLLOWER); //没被选中
          }
        });

    ActiveState –> Follower

    包含LeaderState,CandidatorState,在响应vote,append请求时,都会下面的逻辑

        // If the request indicates a term that is greater than the current term then
        // assign that term and leader to the current context and transition to follower.
        boolean transition = updateTermAndLeader(request.term(), request.leader());
        
        // If a transition is required then transition back to the follower state.
        // If the node is already a follower then the transition will be ignored.
        if (transition) {
          context.transition(CopycatServer.State.FOLLOWER);
        }
    /**
       * Updates the term and leader.
       */
      protected boolean updateTermAndLeader(long term, int leader) {
        // If the request indicates a term that is greater than the current term or no leader has been
        // set for the current term, update leader and term.
        if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) {
          context.setTerm(term);
          context.setLeader(leader);
    
          // Reset the current cluster configuration to the last committed configuration when a leader change occurs.
          context.getClusterState().reset();
          return true;
        }
        return false;
      }
  • 相关阅读:
    Bellman_Ford算法详解
    数据结构实验之图论十:判断给定图是否存在合法拓扑序列(SDUT 2140)
    数据结构实验之图论九:最小生成树 (SDUT 2144)
    数据结构实验之图论七:驴友计划【迪杰斯特拉算法】(SDUT 3363)
    数据结构实验之图论六:村村通公路【Prim算法】(SDUT 3362)
    纯手工打造简单分布式爬虫(Python)
    “永恒之蓝"漏洞的紧急应对--毕业生必看
    从多项式相加看线性结构
    OD常用断点之CC断点
    打造“黑客“手机--Kali Nethunter
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6519940.html
Copyright © 2011-2022 走看看