对于Command,Configuration都要通过appendEntries的方式,把Entries同步给follower
LeaderState.configure
/** * Commits the given configuration. */ protected CompletableFuture<Long> configure(Collection<Member> members) { final long index; try (ConfigurationEntry entry = context.getLog().create(ConfigurationEntry.class)) { entry.setTerm(context.getTerm()) .setTimestamp(System.currentTimeMillis()) .setMembers(members); index = context.getLog().append(entry); //先把configuration写入local log LOGGER.debug("{} - Appended {}", context.getCluster().member().address(), entry); // Store the index of the configuration entry in order to prevent other configurations from // being logged and committed concurrently. This is an important safety property of Raft. configuring = index; //configuring用于互斥 context.getClusterState().configure(new Configuration(entry.getIndex(), entry.getTerm(), entry.getTimestamp(), entry.getMembers())); //更新ClusterState } return appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries,把configuration发给follower context.checkThread(); if (isOpen()) { // Reset the configuration index to allow new configuration changes to be committed. configuring = 0; //configuring完成 } }); }
appendCommand
/** * Sends append requests for a command to followers. */ private void appendCommand(long index, CompletableFuture<CommandResponse> future) { appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries到该index context.checkThread(); if (isOpen()) { if (commitError == null) { applyCommand(index, future); //如果成功,applyCommand } else { future.complete(logResponse(CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.INTERNAL_ERROR) .build())); } } }); }
LeaderAppender
通过LeaderAppender来完成appendEntries,
/** * The leader appender is responsible for sending {@link AppendRequest}s on behalf of a leader to followers. * Append requests are sent by the leader only to other active members of the cluster. * * @author <a href="http://github.com/kuujo>Jordan Halterman</a> */ final class LeaderAppender extends AbstractAppender {
append指定的index
/** * Registers a commit handler for the given commit index. * * @param index The index for which to register the handler. * @return A completable future to be completed once the given log index has been committed. */ public CompletableFuture<Long> appendEntries(long index) { if (index == 0) //如果index=0,等同于heartbeat return appendEntries(); if (index <= context.getCommitIndex()) //如果index小于commit index,说明不需要commit return CompletableFuture.completedFuture(index); // If there are no other stateful servers in the cluster, immediately commit the index. if (context.getClusterState().getActiveMemberStates().isEmpty() && context.getClusterState().getPassiveMemberStates().isEmpty()) { long previousCommitIndex = context.getCommitIndex(); context.setCommitIndex(index); context.setGlobalIndex(index); completeCommits(previousCommitIndex, index); return CompletableFuture.completedFuture(index); } // If there are no other active members in the cluster, update the commit index and complete the commit. // The updated commit index will be sent to passive/reserve members on heartbeats. else if (context.getClusterState().getActiveMemberStates().isEmpty()) { long previousCommitIndex = context.getCommitIndex(); context.setCommitIndex(index); completeCommits(previousCommitIndex, index); return CompletableFuture.completedFuture(index); } // Only send entry-specific AppendRequests to active members of the cluster. return appendFutures.computeIfAbsent(index, i –> { //computeIfAbsent,如果map中没有key为index的item,执行后面的函数 for (MemberState member : context.getClusterState().getActiveMemberStates()) { appendEntries(member); } return new CompletableFuture<>(); }); }
appendFutures
private final Map<Long, CompletableFuture<Long>> appendFutures = new HashMap<>();
每个index对应的future都cache在appendFutures中,那么什么时候这个future会被complete
继续看,对于每个member调用
appendEntries
@Override protected void appendEntries(MemberState member) { // If the member term is less than the current term or the member's configuration index is less // than the local configuration index, send a configuration update to the member. // Ensure that only one configuration attempt per member is attempted at any given time by storing the // member state in a set of configuring members. // Once the configuration is complete sendAppendRequest will be called recursively. else if (member.getConfigTerm() < context.getTerm() || member.getConfigIndex() < context.getClusterState().getConfiguration().index()) { if (member.canConfigure()) { sendConfigureRequest(member, buildConfigureRequest(member)); } } // If the member is a reserve or passive member, send an empty AppendRequest to it. else if (member.getMember().type() == Member.Type.RESERVE || member.getMember().type() == Member.Type.PASSIVE) { if (member.canAppend()) { sendAppendRequest(member, buildAppendEmptyRequest(member)); //如果是reserve或passive,只需要发heartbeat,即emptyAppend } } // If the member's current snapshot index is less than the latest snapshot index and the latest snapshot index // is less than the nextIndex, send a snapshot request. else if (member.getMember().type() == Member.Type.ACTIVE && context.getSnapshotStore().currentSnapshot() != null && context.getSnapshotStore().currentSnapshot().index() >= member.getNextIndex() && context.getSnapshotStore().currentSnapshot().index() > member.getSnapshotIndex()) { if (member.canInstall()) { sendInstallRequest(member, buildInstallRequest(member)); } } // If no AppendRequest is already being sent, send an AppendRequest. else if (member.canAppend()) { sendAppendRequest(member, buildAppendRequest(member, context.getLog().lastIndex())); //发送AppendRequest } }
buildAppendRequest
protected AppendRequest buildAppendRequest(MemberState member, long lastIndex) { // If the log is empty then send an empty commit. // If the next index hasn't yet been set then we send an empty commit first. // If the next index is greater than the last index then send an empty commit. // If the member failed to respond to recent communication send an empty commit. This // helps avoid doing expensive work until we can ascertain the member is back up. if (context.getLog().isEmpty() || member.getNextIndex() > lastIndex || member.getFailureCount() > 0) { return buildAppendEmptyRequest(member); //这里如果是上面的appendEntries(),hb,会直接发送emptyAppend } else { return buildAppendEntriesRequest(member, lastIndex); } }
buildAppendEntriesRequest
/** * Builds a populated AppendEntries request. */ protected AppendRequest buildAppendEntriesRequest(MemberState member, long lastIndex) { Entry prevEntry = getPrevEntry(member); //查找该member的上一个entry index ServerMember leader = context.getLeader(); AppendRequest.Builder builder = AppendRequest.builder() .withTerm(context.getTerm()) .withLeader(leader != null ? leader.id() : 0) .withLogIndex(prevEntry != null ? prevEntry.getIndex() : 0) .withLogTerm(prevEntry != null ? prevEntry.getTerm() : 0) .withCommitIndex(context.getCommitIndex()) .withGlobalIndex(context.getGlobalIndex()); // Calculate the starting index of the list of entries. final long index = prevEntry != null ? prevEntry.getIndex() + 1 : context.getLog().firstIndex(); //如果有preEntry,就从+1开始读,如果没有,就从first开始 // Build a list of entries to send to the member. List<Entry> entries = new ArrayList<>((int) Math.min(8, lastIndex - index + 1)); //意思最多8个 // Build a list of entries up to the MAX_BATCH_SIZE. Note that entries in the log may // be null if they've been compacted and the member to which we're sending entries is just // joining the cluster or is otherwise far behind. Null entries are simply skipped and not // counted towards the size of the batch. // If there exists an entry in the log with size >= MAX_BATCH_SIZE the logic ensures that // entry will be sent in a batch of size one int size = 0; // Iterate through remaining entries in the log up to the last index. for (long i = index; i <= lastIndex; i++) { // Get the entry from the log and append it if it's not null. Entries in the log can be null // if they've been cleaned or compacted from the log. Each entry sent in the append request // has a unique index to handle gaps in the log. Entry entry = context.getLog().get(i); if (entry != null) { if (!entries.isEmpty() && size + entry.size() > MAX_BATCH_SIZE) { //用MAX_BATCH_SIZE控制batch的大小 break; } size += entry.size(); entries.add(entry); } } // Release the previous entry back to the entry pool. if (prevEntry != null) { prevEntry.release(); } // Add the entries to the request builder and build the request. return builder.withEntries(entries).build(); }
sendAppendRequest
/** * Connects to the member and sends a commit message. */ protected void sendAppendRequest(MemberState member, AppendRequest request) { // Set the start time of the member's current commit. This will be used to associate responses // with the current commit request. member.setHeartbeatStartTime(heartbeatTime); super.sendAppendRequest(member, request); }
AbstractAppender
/** * Connects to the member and sends a commit message. */ protected void sendAppendRequest(MemberState member, AppendRequest request) { // Start the append to the member. member.startAppend(); LOGGER.debug("{} - Sent {} to {}", context.getCluster().member().address(), request, member.getMember().address()); context.getConnections().getConnection(member.getMember().address()).whenComplete((connection, error) –> { // 试着去connect context.checkThread(); if (open) { if (error == null) { sendAppendRequest(connection, member, request); // 如果连接成功,sendAppendRequest } else { // Complete the append to the member. member.completeAppend(); // Trigger reactions to the request failure. handleAppendRequestFailure(member, request, error); } } }); }
/** * Sends a commit message. */ protected void sendAppendRequest(Connection connection, MemberState member, AppendRequest request) { long timestamp = System.nanoTime(); connection.<AppendRequest, AppendResponse>send(request).whenComplete((response, error) -> { //send context.checkThread(); // Complete the append to the member. if (!request.entries().isEmpty()) { member.completeAppend(System.nanoTime() - timestamp); } else { member.completeAppend(); } if (open) { if (error == null) { LOGGER.debug("{} - Received {} from {}", context.getCluster().member().address(), response, member.getMember().address()); handleAppendResponse(member, request, response); //如果发送成功 } else { handleAppendResponseFailure(member, request, error); //如果发送失败 } } }); updateNextIndex(member, request); if (!request.entries().isEmpty() && hasMoreEntries(member)) { appendEntries(member); } }
LeaderAppender
/** * Handles an append response. */ protected void handleAppendResponse(MemberState member, AppendRequest request, AppendResponse response) { // Trigger commit futures if necessary. updateHeartbeatTime(member, null); super.handleAppendResponse(member, request, response); }
/** * Handles an append response. */ protected void handleAppendResponse(MemberState member, AppendRequest request, AppendResponse response) { if (response.status() == Response.Status.OK) { handleAppendResponseOk(member, request, response); } else { handleAppendResponseError(member, request, response); } }
/** * Handles a {@link Response.Status#OK} response. */ protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) { // Reset the member failure count and update the member's availability status if necessary. succeedAttempt(member); //更新hb // If replication succeeded then trigger commit futures. if (response.succeeded()) { updateMatchIndex(member, response); //更新match index // If entries were committed to the replica then check commit indexes. if (!request.entries().isEmpty()) { commitEntries(); //试图commit entries } // If there are more entries to send then attempt to send another commit. if (hasMoreEntries(member)) { appendEntries(member); } } // If we've received a greater term, update the term and transition back to follower. else if (response.term() > context.getTerm()) { context.setTerm(response.term()).setLeader(0); context.transition(CopycatServer.State.FOLLOWER); } // If the response failed, the follower should have provided the correct last index in their log. This helps // us converge on the matchIndex faster than by simply decrementing nextIndex one index at a time. else { resetMatchIndex(member, response); resetNextIndex(member); // If there are more entries to send then attempt to send another commit. if (hasMoreEntries(member)) { appendEntries(member); } } }
关键是commitEntries
/** * Checks whether any futures can be completed. */ private void commitEntries() { // Sort the list of replicas, order by the last index that was replicated // to the replica. This will allow us to determine the median index // for all known replicated entries across all cluster members. List<MemberState> members = context.getClusterState().getActiveMemberStates((m1, m2) -> //将ActiveMembers按照matchIndex排序 Long.compare(m2.getMatchIndex() != 0 ? m2.getMatchIndex() : 0l, m1.getMatchIndex() != 0 ? m1.getMatchIndex() : 0l)); // Calculate the current commit index as the median matchIndex. long commitIndex = members.get(quorumIndex()).getMatchIndex(); //取出quorum个member的最小match index // If the commit index has increased then update the commit index. Note that in order to ensure // the leader completeness property holds, we verify that the commit index is greater than or equal to // the index of the leader's no-op entry. Update the commit index and trigger commit futures. long previousCommitIndex = context.getCommitIndex(); if (commitIndex > 0 && commitIndex > previousCommitIndex && (leaderIndex > 0 && commitIndex >= leaderIndex)) { //如果quorum个member都已经完成同步 context.setCommitIndex(commitIndex); completeCommits(previousCommitIndex, commitIndex); //commit这次append } }
可以看到这里,就是将appendFutures中的future拿出来complete掉,表示这次append已经完成
/** * Completes append entries attempts up to the given index. */ private void completeCommits(long previousCommitIndex, long commitIndex) { for (long i = previousCommitIndex + 1; i <= commitIndex; i++) { CompletableFuture<Long> future = appendFutures.remove(i); if (future != null) { future.complete(i); } } }
这里发送出AppendRequest,大家是如果处理的?
public void connectServer(Connection connection) {
connection.handler(AppendRequest.class, request -> state.append(request));
可以看到append也只能在server间request,client是不能直接append的
可以看到append,在每个state中都有实现,上面的append只会发到所有的active memeber
ActiveState
@Override public CompletableFuture<AppendResponse> append(final AppendRequest request) { context.checkThread(); logRequest(request); // If the request indicates a term that is greater than the current term then // assign that term and leader to the current context and transition to follower. boolean transition = updateTermAndLeader(request.term(), request.leader()); CompletableFuture<AppendResponse> future = CompletableFuture.completedFuture(logResponse(handleAppend(request))); // If a transition is required then transition back to the follower state. // If the node is already a follower then the transition will be ignored. if (transition) { context.transition(CopycatServer.State.FOLLOWER); } return future; }
最终会调用到,
@Override protected AppendResponse appendEntries(AppendRequest request) { // Get the last entry index or default to the request log index. long lastEntryIndex = request.logIndex(); if (!request.entries().isEmpty()) { lastEntryIndex = request.entries().get(request.entries().size() - 1).getIndex(); } // Ensure the commitIndex is not increased beyond the index of the last entry in the request. long commitIndex = Math.max(context.getCommitIndex(), Math.min(request.commitIndex(), lastEntryIndex)); // Iterate through request entries and append them to the log. for (Entry entry : request.entries()) { // If the entry index is greater than the last log index, skip missing entries. if (context.getLog().lastIndex() < entry.getIndex()) { context.getLog().skip(entry.getIndex() - context.getLog().lastIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } else if (context.getCommitIndex() >= entry.getIndex()) { continue; } else { // Compare the term of the received entry with the matching entry in the log. long term = context.getLog().term(entry.getIndex()); if (term != 0) { if (entry.getTerm() != term) { // We found an invalid entry in the log. Remove the invalid entry and append the new entry. // If appending to the log fails, apply commits and reply false to the append request. LOGGER.debug("{} - Appended entry term does not match local log, removing incorrect entries", context.getCluster().member().address()); context.getLog().truncate(entry.getIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } } else { context.getLog().truncate(entry.getIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } } } // If we've made it this far, apply commits and send a successful response. LOGGER.debug("{} - Committed entries up to index {}", context.getCluster().member().address(), commitIndex); context.setCommitIndex(commitIndex); context.setGlobalIndex(request.globalIndex()); // Apply commits to the local state machine. context.getStateMachine().applyAll(context.getCommitIndex()); //apply commit return AppendResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withSucceeded(true) .withLogIndex(context.getLog().lastIndex()) .build(); }
可以看到逻辑,就是把entry加到log中,然后apply这个commit
FollowerState
@Override public CompletableFuture<AppendResponse> append(AppendRequest request) { CompletableFuture<AppendResponse> future = super.append(request); // Reset the heartbeat timeout. resetHeartbeatTimeout(); // Send AppendEntries requests to passive members if necessary. appender.appendEntries(); //同步到passive return future; }
只是多了对passive的同步
心跳
一种特殊的append,
heartbeat,append空的entries
LeaderAppender
final class LeaderAppender extends AbstractAppender { private CompletableFuture<Long> heartbeatFuture; private CompletableFuture<Long> nextHeartbeatFuture;
/** * Triggers a heartbeat to a majority of the cluster. * <p> * For followers to which no AppendRequest is currently being sent, a new empty AppendRequest will be * created and sent. For followers to which an AppendRequest is already being sent, the appendEntries() * call will piggyback on the *next* AppendRequest. Thus, multiple calls to this method will only ever * result in a single AppendRequest to each follower at any given time, and the returned future will be * shared by all concurrent calls. * * @return A completable future to be completed the next time a heartbeat is received by a majority of the cluster. */ public CompletableFuture<Long> appendEntries() { // If there are no other active members in the cluster, simply complete the append operation. if (context.getClusterState().getRemoteMemberStates().isEmpty()) return CompletableFuture.completedFuture(null); // If no heartbeat future already exists, that indicates there's no heartbeat currently under way. // Create a new heartbeat future and commit to all members in the cluster. if (heartbeatFuture == null) { CompletableFuture<Long> newHeartbeatFuture = new CompletableFuture<>(); heartbeatFuture = newHeartbeatFuture; heartbeatTime = System.currentTimeMillis(); for (MemberState member : context.getClusterState().getRemoteMemberStates()) { appendEntries(member); } return newHeartbeatFuture; } // If a heartbeat future already exists, that indicates there is a heartbeat currently underway. // We don't want to allow callers to be completed by a heartbeat that may already almost be done. // So, we create the next heartbeat future if necessary and return that. Once the current heartbeat // completes the next future will be used to do another heartbeat. This ensures that only one // heartbeat can be outstanding at any given point in time. else if (nextHeartbeatFuture == null) { nextHeartbeatFuture = new CompletableFuture<>(); return nextHeartbeatFuture; } else { return nextHeartbeatFuture; } }
注释很清楚,heartbeat是发往getRemoteMemberStates,所有members的
heartbeatFuture 和 nextHeartbeatFuture分别表示本次和下次hb的future
LeaderState
心跳是被定期触发的,
startAppendTimer
/** * Starts sending AppendEntries requests to all cluster members. */ private void startAppendTimer() { // Set a timer that will be used to periodically synchronize with other nodes // in the cluster. This timer acts as a heartbeat to ensure this node remains // the leader. LOGGER.debug("{} - Starting append timer", context.getCluster().member().address()); appendTimer = context.getThreadContext().schedule(Duration.ZERO, context.getHeartbeatInterval(), this::appendMembers); }
/** * Sends AppendEntries requests to members of the cluster that haven't heard from the leader in a while. */ private void appendMembers() { context.checkThread(); if (isOpen()) { appender.appendEntries(); } }
heartbeatFuture 和 nextHeartbeatFuture是什么时候被complete的?
@Override protected void handleConfigureResponse(MemberState member, ConfigureRequest request, ConfigureResponse response) { // Trigger commit futures if necessary. updateHeartbeatTime(member, null); super.handleConfigureResponse(member, request, response); }
updateHeartbeatTime
/** * Sets a commit time or fails the commit if a quorum of successful responses cannot be achieved. */ private void updateHeartbeatTime(MemberState member, Throwable error) { if (heartbeatFuture == null) { return; } if (error != null && member.getHeartbeatStartTime() == heartbeatTime) { //如果hb response有error int votingMemberSize = context.getClusterState().getActiveMemberStates().size() + (context.getCluster().member().type() == Member.Type.ACTIVE ? 1 : 0); int quorumSize = (int) Math.floor(votingMemberSize / 2) + 1; // If a quorum of successful responses cannot be achieved, fail this heartbeat. Ensure that only // ACTIVE members are considered. A member could have been transitioned to another state while the // heartbeat was being sent. if (member.getMember().type() == Member.Type.ACTIVE && ++heartbeatFailures > votingMemberSize - quorumSize) { //如果超过votingMemberSize - quorumSize的member hb失败 heartbeatFuture.completeExceptionally(new InternalException("Failed to reach consensus")); //此次hb失败 completeHeartbeat(); } } else { member.setHeartbeatTime(System.currentTimeMillis()); //更新该member的hb // Sort the list of commit times. Use the quorum index to get the last time the majority of the cluster // was contacted. If the current heartbeatFuture's time is less than the commit time then trigger the // commit future and reset it to the next commit future. if (heartbeatTime <= heartbeatTime()) { //如果大于quorum member的hb都比上次hb的时间新 heartbeatFuture.complete(null); //hb成功,complete heartbeatFuture completeHeartbeat(); } } } /** * Returns the last time a majority of the cluster was contacted. * <p> * This is calculated by sorting the list of active members and getting the last time the majority of * the cluster was contacted based on the index of a majority of the members. So, in a list of 3 ACTIVE * members, index 1 (the second member) will be used to determine the commit time in a sorted members list. */ private long heartbeatTime() { int quorumIndex = quorumIndex(); if (quorumIndex >= 0) { return context.getClusterState().getActiveMemberStates((m1, m2)-> Long.compare(m2.getHeartbeatTime(), m1.getHeartbeatTime())).get(quorumIndex).getHeartbeatTime(); } return System.currentTimeMillis(); }
可以看到这里,会把heartbeatFuture complete掉,无论是completeExceptionally还是complete
completeHeartbeat
/** * Completes a heartbeat by replacing the current heartbeatFuture with the nextHeartbeatFuture, updating the * current heartbeatTime, and starting a new {@link AppendRequest} to all active members. */ private void completeHeartbeat() { heartbeatFailures = 0; heartbeatFuture = nextHeartbeatFuture; //把nextHeartbeatFuture给heartbeatFuture nextHeartbeatFuture = null; updateGlobalIndex(); if (heartbeatFuture != null) { heartbeatTime = System.currentTimeMillis(); //更新heartbeatTime for (MemberState member : context.getClusterState().getRemoteMemberStates()) { appendEntries(member); //再来一轮 } } }
前面定期触发时,并不会关系heartbeat是否成功,所以没有处理返回的future
但在appendLinearizableQuery时,需要hb成功,才能query
原因是,如果没有majority,相应我的hb,可能已经发生partition,这时已经无法保证LinearizableQuery
LeaderState
private void appendLinearizableQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) { appender.appendEntries().whenComplete((commitIndex, commitError) -> { context.checkThread(); if (isOpen()) { if (commitError == null) { entry.acquire(); sequenceLinearizableQuery(entry, future); } else { future.complete(logResponse(QueryResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.QUERY_ERROR) .build())); } } entry.release(); }); }