zoukankan      html  css  js  c++  java
  • 结合logcabin看下raft算法实现

    零、项目入口

    raft作者启动的一个项目,以这个为基础看下文章描述的实现方法

    一、candidate启动一次投票

     首先递增任期,之后进入“候选人”状态

    void
    RaftConsensus::startNewElection()
    {
    ……
    ++currentTerm;
    state = State::CANDIDATE;
    leaderId = 0;
    votedFor = serverId;
    ……
    }

    二、向其它服务器发送投票请求

    void
    RaftConsensus::requestVote(std::unique_lock<Mutex>& lockGuard, Peer& peer)
    {
    ……
    Protocol::Raft::RequestVote::Response response;
    VERBOSE("requestVote start");
    TimePoint start = Clock::now();
    uint64_t epoch = currentEpoch;
    Peer::CallStatus status = peer.callRPC(
    Protocol::Raft::OpCode::REQUEST_VOTE,
    request, response,
    lockGuard);
    ……
    }

    三、服务器对于一次投票请求的处理

    当一个server为某一个server投票之后,会更新自己的currentTerm和VoteFor字段。这样,当不同的candidate定时器同时发送vote请求时,它们虽然TermID相同,但是每个先收到的follower在更新currentTerm之后还更新了votefor字段,由于两者并不相同,所以不会在同一个term中给不同的server投票。
    logcabin-masterServerRaftConsensus.cc
    void
    RaftConsensus::handleRequestVote(
    const Protocol::Raft::RequestVote::Request& request,
    Protocol::Raft::RequestVote::Response& response)
    {
    std::lock_guard<Mutex> lockGuard(mutex);
    assert(!exiting);

    // If the caller has a less complete log, we can't give it our vote.
    uint64_t lastLogIndex = log->getLastLogIndex();
    uint64_t lastLogTerm = getLastLogTerm();
    bool logIsOk = (request.last_log_term() > lastLogTerm ||
    (request.last_log_term() == lastLogTerm &&
    request.last_log_index() >= lastLogIndex));

    if (withholdVotesUntil > Clock::now()) {
    NOTICE("Rejecting RequestVote for term %lu from server %lu, since "
    "this server (which is in term %lu) recently heard from a "
    "leader (%lu). Should server %lu be shut down?",
    request.term(), request.server_id(), currentTerm,
    leaderId, request.server_id());
    response.set_term(currentTerm);
    response.set_granted(false);
    response.set_log_ok(logIsOk);
    return;
    }
    //这里要求请求的任期必须是连续递增的,即使时某一个任期选举没有成功。
    if (request.term() > currentTerm) {
    NOTICE("Received RequestVote request from server %lu in term %lu "
    "(this server's term was %lu)",
    request.server_id(), request.term(), currentTerm);
    stepDown(request.term());
    }

    // At this point, if leaderId != 0, we could tell the caller to step down.
    // However, this is just an optimization that does not affect correctness
    // or really even efficiency, so it's not worth the trouble.

    if (request.term() == currentTerm) {
    if (logIsOk && votedFor == 0) {
    // Give caller our vote
    NOTICE("Voting for %lu in term %lu",
    request.server_id(), currentTerm);
    stepDown(currentTerm);
    setElectionTimer();
    votedFor = request.server_id();
    updateLogMetadata();
    printElectionState();
    }
    }

    // Fill in response.
    response.set_term(currentTerm);
    // don't strictly need the first condition
    response.set_granted(request.term() == currentTerm &&
    votedFor == request.server_id());
    response.set_log_ok(logIsOk);
    }

    四、新的leader信息的同步

    通过一个EntryType::NOOP类型的同步包,让所有的server更新leader的ID。
    void
    RaftConsensus::becomeLeader()
    {
    assert(state == State::CANDIDATE);
    NOTICE("Now leader for term %lu (appending no-op at index %lu)",
    currentTerm,
    log->getLastLogIndex() + 1);
    state = State::LEADER;
    leaderId = serverId;
    printElectionState();
    startElectionAt = TimePoint::max();
    withholdVotesUntil = TimePoint::max();

    // Our local cluster time clock has been ticking ever since we got the last
    // log entry/snapshot. Set the clock back to when that happened, since we
    // don't really want to count that time (the cluster probably had no leader
    // for most of it).
    clusterClock.newEpoch(clusterClock.clusterTimeAtEpoch);

    // The ordering is pretty important here: First set nextIndex and
    // matchIndex for ourselves and each follower, then append the no op.
    // Otherwise we'll set our localServer's last agree index too high.
    configuration->forEach(&Server::beginLeadership);

    // Append a new entry so that commitment is not delayed indefinitely.
    // Otherwise, if the leader never gets anything to append, it will never
    // return to read-only operations (it can't prove that its committed index
    // is up-to-date).
    Log::Entry entry;
    entry.set_term(currentTerm);
    entry.set_type(Protocol::Raft::EntryType::NOOP);
    entry.set_cluster_time(clusterClock.leaderStamp());
    append({&entry});

    // Outstanding RequestVote RPCs are no longer needed.
    interruptAll();
    }

    五、leader请求各个follower追加日志

    void
    RaftConsensus::appendEntries(std::unique_lock<Mutex>& lockGuard,
    Peer& peer)
    {
    ……
    if (response.success()) {
    if (peer.matchIndex > prevLogIndex + numEntries) {
    // Revisit this warning if we pipeline AppendEntries RPCs for
    // performance.
    WARNING("matchIndex should monotonically increase within a "
    "term, since servers don't forget entries. But it "
    "didn't.");
    } else {
    peer.matchIndex = prevLogIndex + numEntries;
    advanceCommitIndex();
    }
    peer.nextIndex = peer.matchIndex + 1;
    peer.suppressBulkData = false;

    if (!peer.isCaughtUp_ &&
    peer.thisCatchUpIterationGoalId <= peer.matchIndex) {
    Clock::duration duration =
    Clock::now() - peer.thisCatchUpIterationStart;
    uint64_t thisCatchUpIterationMs =
    uint64_t(std::chrono::duration_cast<
    std::chrono::milliseconds>(duration).count());
    if (labs(int64_t(peer.lastCatchUpIterationMs -
    thisCatchUpIterationMs)) * 1000L * 1000L <
    std::chrono::nanoseconds(ELECTION_TIMEOUT).count()) {
    peer.isCaughtUp_ = true;
    stateChanged.notify_all();
    } else {
    peer.lastCatchUpIterationMs = thisCatchUpIterationMs;
    peer.thisCatchUpIterationStart = Clock::now();
    peer.thisCatchUpIterationGoalId = log->getLastLogIndex();
    }
    }
    } else {
    if (peer.nextIndex > 1)
    --peer.nextIndex;
    // A server that hasn't been around for a while might have a much
    // shorter log than ours. The AppendEntries reply contains the
    // index of its last log entry, and there's no reason for us to
    // set nextIndex to be more than 1 past that (that would leave a
    // gap, so it will always be rejected).
    if (response.has_last_log_index() &&
    peer.nextIndex > response.last_log_index() + 1) {
    peer.nextIndex = response.last_log_index() + 1;
    }
    }
    ……

    六、follower处理更新日志

    void
    RaftConsensus::handleAppendEntries(
    const Protocol::Raft::AppendEntries::Request& request,
    Protocol::Raft::AppendEntries::Response& response)
    {
    ……
    // If the caller's term is stale, just return our term to it.
    if (request.term() < currentTerm) {
    VERBOSE("Caller(%lu) is stale. Our term is %lu, theirs is %lu",
    request.server_id(), currentTerm, request.term());
    return; // response was set to a rejection above
    }
    ……
    }

    七、可提交状态的更新

    如果超过半数同意,advanceCommitIndex函数递增commitIndex,表示这个日志编号可以进入提交状态
    void
    RaftConsensus::appendEntries(std::unique_lock<Mutex>& lockGuard,
    Peer& peer)
    {
    ……
    advanceCommitIndex
    ……
    }
    void
    RaftConsensus::advanceCommitIndex()
    {
    if (state != State::LEADER) {
    // getMatchIndex is undefined unless we're leader
    WARNING("advanceCommitIndex called as %s",
    Core::StringUtil::toString(state).c_str());
    return;
    }

    // calculate the largest entry ID stored on a quorum of servers
    uint64_t newCommitIndex =
    configuration->quorumMin(&Server::getMatchIndex);
    if (commitIndex >= newCommitIndex)
    return;
    ……
    }

    八、应用日志

    通过线程将已经提交的日志逐个引用到状态机中
    logcabin-masterServerStateMachine.cc
    void
    StateMachine::applyThreadMain()
    {
    Core::ThreadId::setName("StateMachine");
    try {
    while (true) {
    RaftConsensus::Entry entry = consensus->getNextEntry(lastApplied);
    std::lock_guard<Core::Mutex> lockGuard(mutex);
    switch (entry.type) {
    case RaftConsensus::Entry::SKIP:
    break;
    case RaftConsensus::Entry::DATA:
    apply(entry);
    break;
    case RaftConsensus::Entry::SNAPSHOT:
    NOTICE("Loading snapshot through entry %lu into state "
    "machine", entry.index);
    loadSnapshot(*entry.snapshotReader);
    NOTICE("Done loading snapshot");
    break;
    }
    expireSessions(entry.clusterTime);
    lastApplied = entry.index;
    entriesApplied.notify_all();
    if (shouldTakeSnapshot(lastApplied) &&
    maySnapshotAt <= Clock::now()) {
    snapshotSuggested.notify_all();
    }
    }
    } catch (const Core::Util::ThreadInterruptedException&) {
    NOTICE("exiting");
    std::lock_guard<Core::Mutex> lockGuard(mutex);
    exiting = true;
    entriesApplied.notify_all();
    snapshotSuggested.notify_all();
    snapshotStarted.notify_all();
    snapshotCompleted.notify_all();
    killSnapshotProcess(Core::HoldingMutex(lockGuard), SIGTERM);
    }
    }

    RaftConsensus::Entry
    RaftConsensus::getNextEntry(uint64_t lastIndex) const
    {
    std::unique_lock<Mutex> lockGuard(mutex);
    uint64_t nextIndex = lastIndex + 1;
    while (true) {
    if (exiting)
    throw Core::Util::ThreadInterruptedException();
    if (commitIndex >= nextIndex) {
    RaftConsensus::Entry entry;

    // Make the state machine load a snapshot if we don't have the next
    // entry it needs in the log.
    if (log->getLogStartIndex() > nextIndex) {
    entry.type = Entry::SNAPSHOT;
    // For well-behaved state machines, we expect 'snapshotReader'
    // to contain a SnapshotFile::Reader that we can return
    // directly to the state machine. In the case that a State
    // Machine asks for the snapshot again, we have to build a new
    // SnapshotFile::Reader again.
    entry.snapshotReader = std::move(snapshotReader);
    if (!entry.snapshotReader) {
    WARNING("State machine asked for same snapshot twice; "
    "this shouldn't happen in normal operation. "
    "Having to re-read it from disk.");
    // readSnapshot() shouldn't have any side effects since the
    // snapshot should have already been read, so const_cast
    // should be ok (though ugly).
    const_cast<RaftConsensus*>(this)->readSnapshot();
    entry.snapshotReader = std::move(snapshotReader);
    }
    entry.index = lastSnapshotIndex;
    entry.clusterTime = lastSnapshotClusterTime;
    } else {
    // not a snapshot
    const Log::Entry& logEntry = log->getEntry(nextIndex);
    entry.index = nextIndex;
    if (logEntry.type() == Protocol::Raft::EntryType::DATA) {
    entry.type = Entry::DATA;
    const std::string& s = logEntry.data();
    entry.command = Core::Buffer(
    memcpy(new char[s.length()], s.data(), s.length()),
    s.length(),
    Core::Buffer::deleteArrayFn<char>);
    } else {
    entry.type = Entry::SKIP;
    }
    entry.clusterTime = logEntry.cluster_time();
    }
    return entry;
    }
    stateChanged.wait(lockGuard);
    }

  • 相关阅读:
    leetcode 10 正则表达式匹配(c++)
    基于.NetCore3.1系列 —— 日志记录之初识Serilog
    AspNetCore WebApi:Serilog(日志)
    .NET Core下的日志(3):如何将日志消息输出到控制台上
    Asp.Net Core用NLog记录日志操作方法
    .NET Core3.0 日志 logging-最好用的日志集合介绍
    .net core 3.1 使用nlog记录日志 NLog.Web.AspNetCore
    NetCore3.1 日志组件 Nlog的使用
    配置 ASP.NET Core 请求(Request)处理管道
    vue进入页面每次都调用methods里的方法
  • 原文地址:https://www.cnblogs.com/tsecer/p/10684523.html
Copyright © 2011-2022 走看看