zoukankan      html  css  js  c++  java
  • Copycat

    对于Command,Configuration都要通过appendEntries的方式,把Entries同步给follower

    LeaderState.configure

    /**
       * Commits the given configuration.
       */
      protected CompletableFuture<Long> configure(Collection<Member> members) {
        final long index;
        try (ConfigurationEntry entry = context.getLog().create(ConfigurationEntry.class)) {
          entry.setTerm(context.getTerm())
            .setTimestamp(System.currentTimeMillis())
            .setMembers(members);
          index = context.getLog().append(entry); //先把configuration写入local log
          LOGGER.debug("{} - Appended {}", context.getCluster().member().address(), entry);
    
          // Store the index of the configuration entry in order to prevent other configurations from
          // being logged and committed concurrently. This is an important safety property of Raft.
          configuring = index; //configuring用于互斥
          context.getClusterState().configure(new Configuration(entry.getIndex(), entry.getTerm(), entry.getTimestamp(), entry.getMembers())); //更新ClusterState
        }
    
        return appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries,把configuration发给follower
          context.checkThread();
          if (isOpen()) {
            // Reset the configuration index to allow new configuration changes to be committed.
            configuring = 0; //configuring完成
          }
        });
      }

    appendCommand

     /**
       * Sends append requests for a command to followers.
       */
      private void appendCommand(long index, CompletableFuture<CommandResponse> future) {
        appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries到该index
          context.checkThread();
          if (isOpen()) {
            if (commitError == null) {
              applyCommand(index, future); //如果成功,applyCommand
            } else {
              future.complete(logResponse(CommandResponse.builder()
                .withStatus(Response.Status.ERROR)
                .withError(CopycatError.Type.INTERNAL_ERROR)
                .build()));
            }
          }
        });
      }

     

    LeaderAppender

    通过LeaderAppender来完成appendEntries,

    /**
     * The leader appender is responsible for sending {@link AppendRequest}s on behalf of a leader to followers.
     * Append requests are sent by the leader only to other active members of the cluster.
     *
     * @author <a href="http://github.com/kuujo>Jordan Halterman</a>
     */
    final class LeaderAppender extends AbstractAppender {

    append指定的index

    /**
       * Registers a commit handler for the given commit index.
       *
       * @param index The index for which to register the handler.
       * @return A completable future to be completed once the given log index has been committed.
       */
      public CompletableFuture<Long> appendEntries(long index) {
        if (index == 0) //如果index=0,等同于heartbeat
          return appendEntries();
    
        if (index <= context.getCommitIndex()) //如果index小于commit index,说明不需要commit
          return CompletableFuture.completedFuture(index);
    
        // If there are no other stateful servers in the cluster, immediately commit the index.
        if (context.getClusterState().getActiveMemberStates().isEmpty() && context.getClusterState().getPassiveMemberStates().isEmpty()) {
          long previousCommitIndex = context.getCommitIndex();
          context.setCommitIndex(index);
          context.setGlobalIndex(index);
          completeCommits(previousCommitIndex, index);
          return CompletableFuture.completedFuture(index);
        }
        // If there are no other active members in the cluster, update the commit index and complete the commit.
        // The updated commit index will be sent to passive/reserve members on heartbeats.
        else if (context.getClusterState().getActiveMemberStates().isEmpty()) {
          long previousCommitIndex = context.getCommitIndex();
          context.setCommitIndex(index);
          completeCommits(previousCommitIndex, index);
          return CompletableFuture.completedFuture(index);
        }
    
        // Only send entry-specific AppendRequests to active members of the cluster.
        return appendFutures.computeIfAbsent(index, i –> { //computeIfAbsent,如果map中没有key为index的item,执行后面的函数
          for (MemberState member : context.getClusterState().getActiveMemberStates()) {
            appendEntries(member);
          }
          return new CompletableFuture<>();
        });
      }

    appendFutures

    private final Map<Long, CompletableFuture<Long>> appendFutures = new HashMap<>();

    每个index对应的future都cache在appendFutures中,那么什么时候这个future会被complete

    继续看,对于每个member调用

    appendEntries

    @Override
      protected void appendEntries(MemberState member) {
    
        // If the member term is less than the current term or the member's configuration index is less
        // than the local configuration index, send a configuration update to the member.
        // Ensure that only one configuration attempt per member is attempted at any given time by storing the
        // member state in a set of configuring members.
        // Once the configuration is complete sendAppendRequest will be called recursively.
        else if (member.getConfigTerm() < context.getTerm() || member.getConfigIndex() < context.getClusterState().getConfiguration().index()) {
          if (member.canConfigure()) {
            sendConfigureRequest(member, buildConfigureRequest(member));
          }
        }
        // If the member is a reserve or passive member, send an empty AppendRequest to it.
        else if (member.getMember().type() == Member.Type.RESERVE || member.getMember().type() == Member.Type.PASSIVE) {
          if (member.canAppend()) {
            sendAppendRequest(member, buildAppendEmptyRequest(member)); //如果是reserve或passive,只需要发heartbeat,即emptyAppend
          }
        }
        // If the member's current snapshot index is less than the latest snapshot index and the latest snapshot index
        // is less than the nextIndex, send a snapshot request.
        else if (member.getMember().type() == Member.Type.ACTIVE && context.getSnapshotStore().currentSnapshot() != null
          && context.getSnapshotStore().currentSnapshot().index() >= member.getNextIndex()
          && context.getSnapshotStore().currentSnapshot().index() > member.getSnapshotIndex()) {
          if (member.canInstall()) {
            sendInstallRequest(member, buildInstallRequest(member));
          }
        }
        // If no AppendRequest is already being sent, send an AppendRequest.
        else if (member.canAppend()) {
          sendAppendRequest(member, buildAppendRequest(member, context.getLog().lastIndex())); //发送AppendRequest
        }
      }

    buildAppendRequest

    protected AppendRequest buildAppendRequest(MemberState member, long lastIndex) {
        // If the log is empty then send an empty commit.
        // If the next index hasn't yet been set then we send an empty commit first.
        // If the next index is greater than the last index then send an empty commit.
        // If the member failed to respond to recent communication send an empty commit. This
        // helps avoid doing expensive work until we can ascertain the member is back up.
        if (context.getLog().isEmpty() || member.getNextIndex() > lastIndex || member.getFailureCount() > 0) {
          return buildAppendEmptyRequest(member); //这里如果是上面的appendEntries(),hb,会直接发送emptyAppend
        } else {
          return buildAppendEntriesRequest(member, lastIndex);
        }
      }

    buildAppendEntriesRequest

    /**
       * Builds a populated AppendEntries request.
       */
      protected AppendRequest buildAppendEntriesRequest(MemberState member, long lastIndex) {
        Entry prevEntry = getPrevEntry(member); //查找该member的上一个entry index
    
        ServerMember leader = context.getLeader();
        AppendRequest.Builder builder = AppendRequest.builder()
          .withTerm(context.getTerm())
          .withLeader(leader != null ? leader.id() : 0)
          .withLogIndex(prevEntry != null ? prevEntry.getIndex() : 0)
          .withLogTerm(prevEntry != null ? prevEntry.getTerm() : 0)
          .withCommitIndex(context.getCommitIndex())
          .withGlobalIndex(context.getGlobalIndex());
    
        // Calculate the starting index of the list of entries.
        final long index = prevEntry != null ? prevEntry.getIndex() + 1 : context.getLog().firstIndex(); //如果有preEntry,就从+1开始读,如果没有,就从first开始
    
        // Build a list of entries to send to the member.
        List<Entry> entries = new ArrayList<>((int) Math.min(8, lastIndex - index + 1)); //意思最多8个
    
        // Build a list of entries up to the MAX_BATCH_SIZE. Note that entries in the log may
        // be null if they've been compacted and the member to which we're sending entries is just
        // joining the cluster or is otherwise far behind. Null entries are simply skipped and not
        // counted towards the size of the batch.
        // If there exists an entry in the log with size >= MAX_BATCH_SIZE the logic ensures that
        // entry will be sent in a batch of size one
        int size = 0;
    
        // Iterate through remaining entries in the log up to the last index.
        for (long i = index; i <= lastIndex; i++) {
          // Get the entry from the log and append it if it's not null. Entries in the log can be null
          // if they've been cleaned or compacted from the log. Each entry sent in the append request
          // has a unique index to handle gaps in the log.
          Entry entry = context.getLog().get(i);
          if (entry != null) {
            if (!entries.isEmpty() && size + entry.size() > MAX_BATCH_SIZE) { //用MAX_BATCH_SIZE控制batch的大小
              break;
            }
            size += entry.size();
            entries.add(entry);
          }
        }
    
        // Release the previous entry back to the entry pool.
        if (prevEntry != null) {
          prevEntry.release();
        }
    
        // Add the entries to the request builder and build the request.
        return builder.withEntries(entries).build();
      }

    sendAppendRequest

    /**
       * Connects to the member and sends a commit message.
       */
      protected void sendAppendRequest(MemberState member, AppendRequest request) {
        // Set the start time of the member's current commit. This will be used to associate responses
        // with the current commit request.
        member.setHeartbeatStartTime(heartbeatTime);
    
        super.sendAppendRequest(member, request);
      }

    AbstractAppender

    /**
       * Connects to the member and sends a commit message.
       */
      protected void sendAppendRequest(MemberState member, AppendRequest request) {
        // Start the append to the member.
        member.startAppend();
    
        LOGGER.debug("{} - Sent {} to {}", context.getCluster().member().address(), request, member.getMember().address());
        context.getConnections().getConnection(member.getMember().address()).whenComplete((connection, error) –> { // 试着去connect
          context.checkThread();
    
          if (open) {
            if (error == null) {
              sendAppendRequest(connection, member, request); // 如果连接成功,sendAppendRequest
            } else {
              // Complete the append to the member.
              member.completeAppend();
    
              // Trigger reactions to the request failure.
              handleAppendRequestFailure(member, request, error);
            }
          }
        });
      }
    /**
       * Sends a commit message.
       */
      protected void sendAppendRequest(Connection connection, MemberState member, AppendRequest request) {
        long timestamp = System.nanoTime();
        connection.<AppendRequest, AppendResponse>send(request).whenComplete((response, error) -> { //send
          context.checkThread();
    
          // Complete the append to the member.
          if (!request.entries().isEmpty()) {
            member.completeAppend(System.nanoTime() - timestamp);
          } else {
            member.completeAppend();
          }
    
          if (open) {
            if (error == null) {
              LOGGER.debug("{} - Received {} from {}", context.getCluster().member().address(), response, member.getMember().address());
              handleAppendResponse(member, request, response); //如果发送成功
            } else {
              handleAppendResponseFailure(member, request, error); //如果发送失败
            }
          }
        });
    
        updateNextIndex(member, request);
        if (!request.entries().isEmpty() && hasMoreEntries(member)) {
          appendEntries(member);
        }
      }

    LeaderAppender

    /**
       * Handles an append response.
       */
      protected void handleAppendResponse(MemberState member, AppendRequest request, AppendResponse response) {
        // Trigger commit futures if necessary.
        updateHeartbeatTime(member, null);
    
        super.handleAppendResponse(member, request, response);
      }
    /**
       * Handles an append response.
       */
      protected void handleAppendResponse(MemberState member, AppendRequest request, AppendResponse response) {
        if (response.status() == Response.Status.OK) {
          handleAppendResponseOk(member, request, response);
        } else {
          handleAppendResponseError(member, request, response);
        }
      }
    /**
       * Handles a {@link Response.Status#OK} response.
       */
      protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {
        // Reset the member failure count and update the member's availability status if necessary.
        succeedAttempt(member); //更新hb
    
        // If replication succeeded then trigger commit futures.
        if (response.succeeded()) {
          updateMatchIndex(member, response); //更新match index 
    
          // If entries were committed to the replica then check commit indexes.
          if (!request.entries().isEmpty()) {
            commitEntries(); //试图commit entries
          }
    
          // If there are more entries to send then attempt to send another commit.
          if (hasMoreEntries(member)) {
            appendEntries(member);
          }
        }
        // If we've received a greater term, update the term and transition back to follower.
        else if (response.term() > context.getTerm()) {
          context.setTerm(response.term()).setLeader(0);
          context.transition(CopycatServer.State.FOLLOWER);
        }
        // If the response failed, the follower should have provided the correct last index in their log. This helps
        // us converge on the matchIndex faster than by simply decrementing nextIndex one index at a time.
        else {
          resetMatchIndex(member, response);
          resetNextIndex(member);
    
          // If there are more entries to send then attempt to send another commit.
          if (hasMoreEntries(member)) {
            appendEntries(member);
          }
        }
      }

    关键是commitEntries

    /**
       * Checks whether any futures can be completed.
       */
      private void commitEntries() {
    
        // Sort the list of replicas, order by the last index that was replicated
        // to the replica. This will allow us to determine the median index
        // for all known replicated entries across all cluster members.
        List<MemberState> members = context.getClusterState().getActiveMemberStates((m1, m2) -> //将ActiveMembers按照matchIndex排序
          Long.compare(m2.getMatchIndex() != 0 ? m2.getMatchIndex() : 0l, m1.getMatchIndex() != 0 ? m1.getMatchIndex() : 0l));
    
        // Calculate the current commit index as the median matchIndex.
        long commitIndex = members.get(quorumIndex()).getMatchIndex(); //取出quorum个member的最小match index
    
        // If the commit index has increased then update the commit index. Note that in order to ensure
        // the leader completeness property holds, we verify that the commit index is greater than or equal to
        // the index of the leader's no-op entry. Update the commit index and trigger commit futures.
        long previousCommitIndex = context.getCommitIndex();
        if (commitIndex > 0 && commitIndex > previousCommitIndex && (leaderIndex > 0 && commitIndex >= leaderIndex)) { //如果quorum个member都已经完成同步
          context.setCommitIndex(commitIndex);
          completeCommits(previousCommitIndex, commitIndex); //commit这次append
        }
      }

    可以看到这里,就是将appendFutures中的future拿出来complete掉,表示这次append已经完成

    /**
       * Completes append entries attempts up to the given index.
       */
      private void completeCommits(long previousCommitIndex, long commitIndex) {
        for (long i = previousCommitIndex + 1; i <= commitIndex; i++) {
          CompletableFuture<Long> future = appendFutures.remove(i);
          if (future != null) {
            future.complete(i);
          }
        }
      }

    这里发送出AppendRequest,大家是如果处理的?

    public void connectServer(Connection connection) {
      connection.handler(AppendRequest.class, request -> state.append(request));

    可以看到append也只能在server间request,client是不能直接append的

     

    可以看到append,在每个state中都有实现,上面的append只会发到所有的active memeber

    ActiveState 
    @Override
      public CompletableFuture<AppendResponse> append(final AppendRequest request) {
        context.checkThread();
        logRequest(request);
    
        // If the request indicates a term that is greater than the current term then
        // assign that term and leader to the current context and transition to follower.
        boolean transition = updateTermAndLeader(request.term(), request.leader());
    
        CompletableFuture<AppendResponse> future = CompletableFuture.completedFuture(logResponse(handleAppend(request)));
    
        // If a transition is required then transition back to the follower state.
        // If the node is already a follower then the transition will be ignored.
        if (transition) {
          context.transition(CopycatServer.State.FOLLOWER);
        }
        return future;
      }

    最终会调用到,

    @Override
      protected AppendResponse appendEntries(AppendRequest request) {
        // Get the last entry index or default to the request log index.
        long lastEntryIndex = request.logIndex();
        if (!request.entries().isEmpty()) {
          lastEntryIndex = request.entries().get(request.entries().size() - 1).getIndex();
        }
    
        // Ensure the commitIndex is not increased beyond the index of the last entry in the request.
        long commitIndex = Math.max(context.getCommitIndex(), Math.min(request.commitIndex(), lastEntryIndex));
    
        // Iterate through request entries and append them to the log.
        for (Entry entry : request.entries()) {
          // If the entry index is greater than the last log index, skip missing entries.
          if (context.getLog().lastIndex() < entry.getIndex()) {
            context.getLog().skip(entry.getIndex() - context.getLog().lastIndex() - 1).append(entry);
            LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex());
          } else if (context.getCommitIndex() >= entry.getIndex()) {
            continue;
          } else {
            // Compare the term of the received entry with the matching entry in the log.
            long term = context.getLog().term(entry.getIndex());
            if (term != 0) {
              if (entry.getTerm() != term) {
                // We found an invalid entry in the log. Remove the invalid entry and append the new entry.
                // If appending to the log fails, apply commits and reply false to the append request.
                LOGGER.debug("{} - Appended entry term does not match local log, removing incorrect entries", context.getCluster().member().address());
                context.getLog().truncate(entry.getIndex() - 1).append(entry);
                LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex());
              }
            } else {
              context.getLog().truncate(entry.getIndex() - 1).append(entry);
              LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex());
            }
          }
        }
    
        // If we've made it this far, apply commits and send a successful response.
        LOGGER.debug("{} - Committed entries up to index {}", context.getCluster().member().address(), commitIndex);
        context.setCommitIndex(commitIndex);
        context.setGlobalIndex(request.globalIndex());
    
        // Apply commits to the local state machine.
        context.getStateMachine().applyAll(context.getCommitIndex()); //apply commit
    
        return AppendResponse.builder()
          .withStatus(Response.Status.OK)
          .withTerm(context.getTerm())
          .withSucceeded(true)
          .withLogIndex(context.getLog().lastIndex())
          .build();
      }

    可以看到逻辑,就是把entry加到log中,然后apply这个commit

    FollowerState
    @Override
      public CompletableFuture<AppendResponse> append(AppendRequest request) {
        CompletableFuture<AppendResponse> future = super.append(request);
    
        // Reset the heartbeat timeout.
        resetHeartbeatTimeout();
    
        // Send AppendEntries requests to passive members if necessary.
        appender.appendEntries(); //同步到passive
        return future;
      }

    只是多了对passive的同步

    心跳

    一种特殊的append,

    heartbeat,append空的entries

    LeaderAppender
    final class LeaderAppender extends AbstractAppender {
      private CompletableFuture<Long> heartbeatFuture;
      private CompletableFuture<Long> nextHeartbeatFuture;
     
    /**
       * Triggers a heartbeat to a majority of the cluster.
       * <p>
       * For followers to which no AppendRequest is currently being sent, a new empty AppendRequest will be
       * created and sent. For followers to which an AppendRequest is already being sent, the appendEntries()
       * call will piggyback on the *next* AppendRequest. Thus, multiple calls to this method will only ever
       * result in a single AppendRequest to each follower at any given time, and the returned future will be
       * shared by all concurrent calls.
       *
       * @return A completable future to be completed the next time a heartbeat is received by a majority of the cluster.
       */
      public CompletableFuture<Long> appendEntries() {
        // If there are no other active members in the cluster, simply complete the append operation.
        if (context.getClusterState().getRemoteMemberStates().isEmpty())
          return CompletableFuture.completedFuture(null);
    
        // If no heartbeat future already exists, that indicates there's no heartbeat currently under way.
        // Create a new heartbeat future and commit to all members in the cluster.
        if (heartbeatFuture == null) {
          CompletableFuture<Long> newHeartbeatFuture = new CompletableFuture<>();
          heartbeatFuture = newHeartbeatFuture;
          heartbeatTime = System.currentTimeMillis();
          for (MemberState member : context.getClusterState().getRemoteMemberStates()) {
            appendEntries(member);
          }
          return newHeartbeatFuture;
        }
        // If a heartbeat future already exists, that indicates there is a heartbeat currently underway.
        // We don't want to allow callers to be completed by a heartbeat that may already almost be done.
        // So, we create the next heartbeat future if necessary and return that. Once the current heartbeat
        // completes the next future will be used to do another heartbeat. This ensures that only one
        // heartbeat can be outstanding at any given point in time.
        else if (nextHeartbeatFuture == null) {
          nextHeartbeatFuture = new CompletableFuture<>();
          return nextHeartbeatFuture;
        } else {
          return nextHeartbeatFuture;
        }
      }

    注释很清楚,heartbeat是发往getRemoteMemberStates,所有members的

    heartbeatFuture 和 nextHeartbeatFuture分别表示本次和下次hb的future

    LeaderState

    心跳是被定期触发的,

    startAppendTimer

    /**
       * Starts sending AppendEntries requests to all cluster members.
       */
      private void startAppendTimer() {
        // Set a timer that will be used to periodically synchronize with other nodes
        // in the cluster. This timer acts as a heartbeat to ensure this node remains
        // the leader.
        LOGGER.debug("{} - Starting append timer", context.getCluster().member().address());
        appendTimer = context.getThreadContext().schedule(Duration.ZERO, context.getHeartbeatInterval(), this::appendMembers);
      }
    向所有的member定期发送心跳

    /**
       * Sends AppendEntries requests to members of the cluster that haven't heard from the leader in a while.
       */
      private void appendMembers() {
        context.checkThread();
        if (isOpen()) {
          appender.appendEntries();
        }
      }

    heartbeatFuture 和 nextHeartbeatFuture是什么时候被complete的?

    @Override
      protected void handleConfigureResponse(MemberState member, ConfigureRequest request, ConfigureResponse response) {
        // Trigger commit futures if necessary.
        updateHeartbeatTime(member, null);
    
        super.handleConfigureResponse(member, request, response);
      }

    updateHeartbeatTime

    /**
       * Sets a commit time or fails the commit if a quorum of successful responses cannot be achieved.
       */
      private void updateHeartbeatTime(MemberState member, Throwable error) {
        if (heartbeatFuture == null) {
          return;
        }
    
        if (error != null && member.getHeartbeatStartTime() == heartbeatTime) { //如果hb response有error
          int votingMemberSize = context.getClusterState().getActiveMemberStates().size() + (context.getCluster().member().type() == Member.Type.ACTIVE ? 1 : 0);
          int quorumSize = (int) Math.floor(votingMemberSize / 2) + 1;
          // If a quorum of successful responses cannot be achieved, fail this heartbeat. Ensure that only
          // ACTIVE members are considered. A member could have been transitioned to another state while the
          // heartbeat was being sent.
          if (member.getMember().type() == Member.Type.ACTIVE && ++heartbeatFailures > votingMemberSize - quorumSize) { //如果超过votingMemberSize - quorumSize的member hb失败
            heartbeatFuture.completeExceptionally(new InternalException("Failed to reach consensus")); //此次hb失败
            completeHeartbeat();
          }
        } else {
          member.setHeartbeatTime(System.currentTimeMillis()); //更新该member的hb
    
          // Sort the list of commit times. Use the quorum index to get the last time the majority of the cluster
          // was contacted. If the current heartbeatFuture's time is less than the commit time then trigger the
          // commit future and reset it to the next commit future.
          if (heartbeatTime <= heartbeatTime()) { //如果大于quorum member的hb都比上次hb的时间新
            heartbeatFuture.complete(null); //hb成功,complete heartbeatFuture
            completeHeartbeat();
          }
        }
      }
    
      /**
       * Returns the last time a majority of the cluster was contacted.
       * <p>
       * This is calculated by sorting the list of active members and getting the last time the majority of
       * the cluster was contacted based on the index of a majority of the members. So, in a list of 3 ACTIVE
       * members, index 1 (the second member) will be used to determine the commit time in a sorted members list.
       */
      private long heartbeatTime() {
        int quorumIndex = quorumIndex();
        if (quorumIndex >= 0) {
          return context.getClusterState().getActiveMemberStates((m1, m2)-> Long.compare(m2.getHeartbeatTime(), m1.getHeartbeatTime())).get(quorumIndex).getHeartbeatTime();
        }
        return System.currentTimeMillis();
      }

    可以看到这里,会把heartbeatFuture complete掉,无论是completeExceptionally还是complete

    completeHeartbeat

    /**
       * Completes a heartbeat by replacing the current heartbeatFuture with the nextHeartbeatFuture, updating the
       * current heartbeatTime, and starting a new {@link AppendRequest} to all active members.
       */
      private void completeHeartbeat() {
        heartbeatFailures = 0;
        heartbeatFuture = nextHeartbeatFuture; //把nextHeartbeatFuture给heartbeatFuture
        nextHeartbeatFuture = null;
        updateGlobalIndex();
        if (heartbeatFuture != null) {
          heartbeatTime = System.currentTimeMillis(); //更新heartbeatTime
          for (MemberState member : context.getClusterState().getRemoteMemberStates()) {
            appendEntries(member); //再来一轮
          }
        }
      }

    前面定期触发时,并不会关系heartbeat是否成功,所以没有处理返回的future

    但在appendLinearizableQuery时,需要hb成功,才能query

    原因是,如果没有majority,相应我的hb,可能已经发生partition,这时已经无法保证LinearizableQuery

    LeaderState

    private void appendLinearizableQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) {
        appender.appendEntries().whenComplete((commitIndex, commitError) -> {
          context.checkThread();
          if (isOpen()) {
            if (commitError == null) {
              entry.acquire();
              sequenceLinearizableQuery(entry, future);
            } else {
              future.complete(logResponse(QueryResponse.builder()
                .withStatus(Response.Status.ERROR)
                .withError(CopycatError.Type.QUERY_ERROR)
                .build()));
            }
          }
          entry.release();
        });
      }
  • 相关阅读:
    Azure Active Directory document ---reading notes
    tcp/ip 三次握手和4次挥手
    why microsoft named their cloud service Azure?
    Azure VMs
    Leetcode (C++) 9/20/2017开始
    How do you make an object in C? Used in RTOS.
    HF-DP1: strategy pattern
    确定一下学习的主要参考书
    记一下Thoratec的面试
    《Algorithm in C》by Sedgewick 读书笔记
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6525910.html
Copyright © 2011-2022 走看看