zoukankan      html  css  js  c++  java
  • 9. SOFAJRaft源码分析— Follower如何通过Snapshot快速追上Leader日志?

    前言

    引入快照机制主要是为了解决两个问题:

    1. JRaft新节点加入后,如何快速追上最新的数据
    2. Raft 节点出现故障重新启动后如何高效恢复到最新的数据

    Snapshot 源码分析

    生成 Raft 节点的快照文件

    如果用户需开启 SOFAJRaft 的 Snapshot 机制,则需要在其客户端中设置配置参数类 NodeOptions 的“snapshotUri”属性(即为:Snapshot 文件的存储路径),配置该属性后,默认会启动一个定时器任务(“JRaft-SnapshotTimer”)自动去完成 Snapshot 操作,间隔时间通过配置类 NodeOptions 的“snapshotIntervalSecs”属性指定,默认 3600 秒。定时任务启动代码如下:

    NodeImpl#init

    this.snapshotTimer = new RepeatedTimer("JRaft-SnapshotTimer", this.options.getSnapshotIntervalSecs() * 1000) {
    
        @Override
        protected void onTrigger() {
            handleSnapshotTimeout();
        }
    };
    
    private void handleSnapshotTimeout() {
        this.writeLock.lock();
        try {
            if (!this.state.isActive()) {
                return;
            }
        } finally {
            this.writeLock.unlock();
        }
        // do_snapshot in another thread to avoid blocking the timer thread.
    	  //异步调用doSnapshot
        Utils.runInThread(() -> doSnapshot(null));
    }
    
    private void doSnapshot(final Closure done) {
        if (this.snapshotExecutor != null) {
            this.snapshotExecutor.doSnapshot(done);
        } else {
            if (done != null) {
                final Status status = new Status(RaftError.EINVAL, "Snapshot is not supported");
                Utils.runClosureInThread(done, status);
            }
        }
    }
    

    最后这里会调用快照执行器的doSnapshot方法,我们往下看。

    SnapshotExecutorImpl#doSnapshot

    public void doSnapshot(final Closure done) {
        boolean doUnlock = true;
        this.lock.lock();
        try {
            //正在停止
            if (this.stopped) {
                Utils.runClosureInThread(done, new Status(RaftError.EPERM, "Is stopped."));
                return;
            }
            //正在下载镜像
            if (this.downloadingSnapshot.get() != null) {
                Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is loading another snapshot."));
                return;
            }
            //正在保存镜像
            if (this.savingSnapshot) {
                Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is saving another snapshot."));
                return;
            }
            //当前业务状态机已经提交的 Index 索引是否等于 Snapshot 最后保存的日志 Index 索引
            //如果两个值相等则表示,业务数据没有新增,无需再生成一次没有意义的 Snapshot
            if (this.fsmCaller.getLastAppliedIndex() == this.lastSnapshotIndex) {
                // There might be false positive as the getLastAppliedIndex() is being
                // updated. But it's fine since we will do next snapshot saving in a
                // predictable time.
                doUnlock = false;
    
                this.lock.unlock();
                this.logManager.clearBufferedLogs();
                Utils.runClosureInThread(done);
                return;
            }
            //创建一个快照存储器,用来写数据
            final SnapshotWriter writer = this.snapshotStorage.create();
            if (writer == null) {
                Utils.runClosureInThread(done, new Status(RaftError.EIO, "Fail to create writer."));
                reportError(RaftError.EIO.getNumber(), "Fail to create snapshot writer.");
                return;
            }
            this.savingSnapshot = true;
            //封装了回调方法和快照存储器
            final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null);
            //交给状态机来保存快照
            if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) {
                Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down."));
                return;
            }
            this.runningJobs.incrementAndGet();
        } finally {
            if (doUnlock) {
                this.lock.unlock();
            }
        }
    }
    

    doSnapshot方法首先会去进行几个校验,然后会调用状态机的onSnapshotSave方法去保存快照

    FSMCallerImpl#onSnapshotSave

    public boolean onSnapshotSave(final SaveSnapshotClosure done) {
        //发布事件到ApplyTaskHandler中处理
        return enqueueTask((task, sequence) -> {
            task.type = TaskType.SNAPSHOT_SAVE;
            task.done = done;
        });
    }
    

    状态机的onSnapshotSave方法会将事件发布到Disruptor中,交给ApplyTaskHandler处理。

    最后会调用doSnapshotSave方法进行处理

    private void doSnapshotSave(final SaveSnapshotClosure done) {
        Requires.requireNonNull(done, "SaveSnapshotClosure is null");
        //设置最新的任期和index到metaBuilder中
        final long lastAppliedIndex = this.lastAppliedIndex.get();
        final RaftOutter.SnapshotMeta.Builder metaBuilder = RaftOutter.SnapshotMeta.newBuilder() //
            .setLastIncludedIndex(lastAppliedIndex) //
            .setLastIncludedTerm(this.lastAppliedTerm);
        //设置当前配置到metaBuilder
        final ConfigurationEntry confEntry = this.logManager.getConfiguration(lastAppliedIndex);
        if (confEntry == null || confEntry.isEmpty()) {
            LOG.error("Empty conf entry for lastAppliedIndex={}", lastAppliedIndex);
            Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Empty conf entry for lastAppliedIndex=%s",
                lastAppliedIndex));
            return;
        }
        for (final PeerId peer : confEntry.getConf()) {
            metaBuilder.addPeers(peer.toString());
        }
        if (confEntry.getOldConf() != null) {
            for (final PeerId peer : confEntry.getOldConf()) {
                metaBuilder.addOldPeers(peer.toString());
            }
        }
        //设置元数据到done实例中
        final SnapshotWriter writer = done.start(metaBuilder.build());
        if (writer == null) {
            done.run(new Status(RaftError.EINVAL, "snapshot_storage create SnapshotWriter failed"));
            return;
        }
        //调用状态机的实例生成快照
        this.fsm.onSnapshotSave(writer, done);
    }
    

    这个方法会将配置参数全部都设置到metaBuilder中,然后调用状态机实例onSnapshotSave方法,我们这里可以看官方的例子Counter 计数器示例:https://www.sofastack.tech/projects/sofa-jraft/counter-example/ ,看看是怎么使用的。

    CounterStateMachine#onSnapshotSave

    public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
        final long currVal = this.value.get();
        //异步将数据落盘
        Utils.runInThread(() -> {
            final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
            if (snapshot.save(currVal)) {
                if (writer.addFile("data")) {
                    done.run(Status.OK());
                } else {
                    done.run(new Status(RaftError.EIO, "Fail to add file to writer"));
                }
            } else {
                done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath()));
            }
        });
    }
    

    这个方法会将数据获取之后写到文件内,然后在保存快照文件后调用传入的参数 closure.run(status) 通知调用者保存成功或者失败。

    由于我们这里传入的回调实例是SaveSnapshotDone实例,所以会调用SaveSnapshotDone的run方法中:
    SaveSnapshotDone

    public void run(final Status status) {
        Utils.runInThread(() -> continueRun(status));
    }
    
    void continueRun(final Status st) {
        //校验index、设置index和任期,更新状态为已保存快照完毕
        final int ret = onSnapshotSaveDone(st, this.meta, this.writer);
        if (ret != 0 && st.isOk()) {
            st.setError(ret, "node call onSnapshotSaveDone failed");
        }
        if (this.done != null) {
            Utils.runClosureInThread(this.done, st);
        }
    }
    

    run方法会异步的调用continueRun方法,然后调用到onSnapshotSaveDone,里面校验index、设置index和任期,更新状态为已保存快照完毕。

    安装快照

    Jraft在发送日志到Follower的时候会判断一下需要发送快照,以便让 Follower 快速跟上 Leader 的日志进度,不再回放很早以前的日志信息,即缓解了网络的吞吐量,又提升了日志同步的效率。

    Replicator#sendEntries

    private boolean sendEntries(final long nextSendingIndex) {
        final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
        //填写当前Replicator的配置信息到rb中
        if (!fillCommonFields(rb, nextSendingIndex - 1, false)) {
            // unlock id in installSnapshot
            installSnapshot();
            return false;
        }
    	....//省略
    }
    

    这里会调用installSnapshot发送rpc请求给Follower

    Replicator#installSnapshot

    void installSnapshot() {
        //正在安装快照
        if (this.state == State.Snapshot) {
            LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this.options.getPeerId());
            this.id.unlock();
            return;
        }
        boolean doUnlock = true;
        try {
            Requires.requireTrue(this.reader == null,
                "Replicator %s already has a snapshot reader, current state is %s", this.options.getPeerId(),
                this.state);
            //初始化SnapshotReader
            this.reader = this.options.getSnapshotStorage().open();
            //如果快照存储功能没有开启,则设置错误信息并返回
            if (this.reader == null) {
                final NodeImpl node = this.options.getNode();
                final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
                error.setStatus(new Status(RaftError.EIO, "Fail to open snapshot"));
                this.id.unlock();
                doUnlock = false;
                node.onError(error);
                return;
            }
            //生成一个读uri连接,给其他节点读取快照
            final String uri = this.reader.generateURIForCopy();
            if (uri == null) {
                final NodeImpl node = this.options.getNode();
                final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
                error.setStatus(new Status(RaftError.EIO, "Fail to generate uri for snapshot reader"));
                releaseReader();
                this.id.unlock();
                doUnlock = false;
                node.onError(error);
                return;
            }
            //获取从文件加载的元数据信息
            final RaftOutter.SnapshotMeta meta = this.reader.load();
            if (meta == null) {
                final String snapshotPath = this.reader.getPath();
                final NodeImpl node = this.options.getNode();
                final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
                error.setStatus(new Status(RaftError.EIO, "Fail to load meta from %s", snapshotPath));
                releaseReader();
                this.id.unlock();
                doUnlock = false;
                node.onError(error);
                return;
            }
            //设置请求参数
            final InstallSnapshotRequest.Builder rb = InstallSnapshotRequest.newBuilder();
            rb.setTerm(this.options.getTerm());
            rb.setGroupId(this.options.getGroupId());
            rb.setServerId(this.options.getServerId().toString());
            rb.setPeerId(this.options.getPeerId().toString());
            rb.setMeta(meta);
            rb.setUri(uri);
    
            this.statInfo.runningState = RunningState.INSTALLING_SNAPSHOT;
            this.statInfo.lastLogIncluded = meta.getLastIncludedIndex();
            this.statInfo.lastTermIncluded = meta.getLastIncludedTerm();
    
            final InstallSnapshotRequest request = rb.build();
            this.state = State.Snapshot;
            // noinspection NonAtomicOperationOnVolatileField
            this.installSnapshotCounter++;
            final long monotonicSendTimeMs = Utils.monotonicMs();
            final int stateVersion = this.version;
            final int seq = getAndIncrementReqSeq();
            //发起InstallSnapshotRequest请求
            final Future<Message> rpcFuture = this.rpcService.installSnapshot(this.options.getPeerId().getEndpoint(),
                request, new RpcResponseClosureAdapter<InstallSnapshotResponse>() {
    
                    @Override
                    public void run(final Status status) {
                        onRpcReturned(Replicator.this.id, RequestType.Snapshot, status, request, getResponse(), seq,
                            stateVersion, monotonicSendTimeMs);
                    }
                });
            addInflight(RequestType.Snapshot, this.nextIndex, 0, 0, seq, rpcFuture);
        } finally {
            if (doUnlock) {
                this.id.unlock();
            }
        }
    }
    

    在发送InstallSnapshotRequest请求之前,先会做几个校验:

    1. 校验用户是否设置配置参数类 NodeOptions 的“snapshotUri”属性,如果没有设置就不会开启快照,返回reader就为空
    2. 是否可以返回一个获取快照的uri
    3. 能否从获取从文件加载的元数据信息
      如果上面的校验都通过的话,那么就会发送一个InstallSnapshotRequest请求到Follower,交给InstallSnapshotRequestProcessor处理器处理,最后会跳转到NodeImpl的handleInstallSnapshot方法执行具体逻辑。

    NodeImpl#handleInstallSnapshot

    public Message handleInstallSnapshot(final InstallSnapshotRequest request, final RpcRequestClosure done) {
        // 如果快照安装执行器不存在,则抛出异常不支持快照操作
        if (this.snapshotExecutor == null) {
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Not supported snapshot");
        }
        // 根据请求携带的 serverId 序列化 PeerId
        final PeerId serverId = new PeerId();
        if (!serverId.parse(request.getServerId())) {
            LOG.warn("Node {} ignore InstallSnapshotRequest from {} bad server id.", getNodeId(),
             request.getServerId());
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse serverId failed: %s", request.getServerId());
        }
    
        this.writeLock.lock();
        try {
            // 判断当前节点的状态
            if (!this.state.isActive()) {
                LOG.warn("Node {} ignore InstallSnapshotRequest as it is not in active state {}.", getNodeId(),
                        this.state);
                return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s:%s is not in active state, state %s.",
                        this.groupId, this.serverId, this.state.name());
            }
            // 判断 request 携带的 term 比当前节点的 trem,比较 term 的合法性
            if (request.getTerm() < this.currTerm) {
                LOG.warn("Node {} ignore stale InstallSnapshotRequest from {}, term={}, currTerm={}.", getNodeId(),
                        request.getPeerId(), request.getTerm(), this.currTerm);
                return InstallSnapshotResponse.newBuilder() //
                        .setTerm(this.currTerm) //
                        .setSuccess(false) //
                        .build();
            }
            //判断当前节点leader的合法性
            checkStepDown(request.getTerm(), serverId);
    
            if (!serverId.equals(this.leaderId)) {
                LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.",
                        serverId, this.currTerm, this.leaderId);
                // Increase the term by 1 and make both leaders step down to minimize the
                // loss of split brain
                stepDown(request.getTerm() + 1, false, new Status(RaftError.ELEADERCONFLICT,
                        "More than one leader in the same term."));
                return InstallSnapshotResponse.newBuilder() //
                        .setTerm(request.getTerm() + 1) //
                        .setSuccess(false) //
                        .build();
            }
    
        } finally {
            this.writeLock.unlock();
        }
        final long startMs = Utils.monotonicMs();
        try {
            if (LOG.isInfoEnabled()) {
                LOG.info(
                        "Node {} received InstallSnapshotRequest from {}, lastIncludedLogIndex={}, " +
                         "lastIncludedLogTerm={}, lastLogId={}.",
                        getNodeId(), request.getServerId(), request.getMeta().getLastIncludedIndex(), request.getMeta()
                                .getLastIncludedTerm(), this.logManager.getLastLogId(false));
            }
            // 执行快照安装
            this.snapshotExecutor.installSnapshot(request, InstallSnapshotResponse.newBuilder(), done);
            return null;
        } finally {
            this.metrics.recordLatency("install-snapshot", Utils.monotonicMs() - startMs);
        }
    }
    

    这个方法进过一系列的校验后会调用快照执行器的installSnapshot执行快照安装

    SnapshotExecutorImpl#installSnapshot

    public void installSnapshot(final InstallSnapshotRequest request, final InstallSnapshotResponse.Builder response,
                                final RpcRequestClosure done) {
        final SnapshotMeta meta = request.getMeta();
        // 创建一个下载快照的任务对象
        final DownloadingSnapshot ds = new DownloadingSnapshot(request, response, done);
        //DON'T access request, response, and done after this point
        //as the retry snapshot will replace this one.
        // 将下载快照任务进行注册
        if (!registerDownloadingSnapshot(ds)) {
            LOG.warn("Fail to register downloading snapshot");
            // This RPC will be responded by the previous session
            return;
        }
        Requires.requireNonNull(this.curCopier, "curCopier");
        try {
            // 阻塞等待 copy 任务完成
            this.curCopier.join();
        } catch (final InterruptedException e) {
            // 中断补偿,如果 curCopier 任务被中断过,表明有更新的 snapshot 在接受了,旧的 snapshot 被停止下载
            Thread.currentThread().interrupt();
            LOG.warn("Install snapshot copy job was canceled.");
            return;
        }
        // 装载下载好的 snapshot 文件
        loadDownloadingSnapshot(ds, meta);
    }
    

    这个方法会调用registerDownloadingSnapshot方法将快照进行下载注册,然后调用join方法阻塞直到下载完成,然后调用loadDownloadingSnapshot方法装载下载好的文件

    SnapshotExecutorImpl#loadDownloadingSnapshot

    void loadDownloadingSnapshot(final DownloadingSnapshot ds, final SnapshotMeta meta) {
        SnapshotReader reader;
        this.lock.lock();
        try {
            // 获取快照任务的结果,如果不相等则表示新的 snapshot 在接收
            if (ds != this.downloadingSnapshot.get()) {
                //It is interrupted and response by other request,just return
                return;
            }
            Requires.requireNonNull(this.curCopier, "curCopier");
            reader = this.curCopier.getReader();
            //校验复制机状态是否正常
            if (!this.curCopier.isOk()) {
                if (this.curCopier.getCode() == RaftError.EIO.getNumber()) {
                    reportError(this.curCopier.getCode(), this.curCopier.getErrorMsg());
                }
                Utils.closeQuietly(reader);
                ds.done.run(this.curCopier);
                Utils.closeQuietly(this.curCopier);
                this.curCopier = null;
                this.downloadingSnapshot.set(null);
                this.runningJobs.countDown();
                return;
            }
            Utils.closeQuietly(this.curCopier);
            this.curCopier = null;
            //校验reader状态是否正常
            if (reader == null || !reader.isOk()) {
                Utils.closeQuietly(reader);
                this.downloadingSnapshot.set(null);
                ds.done.sendResponse(RpcResponseFactory.newResponse(RaftError.EINTERNAL,
                    "Fail to copy snapshot from %s", ds.request.getUri()));
                this.runningJobs.countDown();
                return;
            }
            this.loadingSnapshot = true;
            this.loadingSnapshotMeta = meta;
        } finally {
            this.lock.unlock();
        }
        // 下载 snapshot 成功,进入状态机进行 snapshot 安装
        final InstallSnapshotDone installSnapshotDone = new InstallSnapshotDone(reader);
        // 送入状态机执行快照安装事件
        if (!this.fsmCaller.onSnapshotLoad(installSnapshotDone)) {
            LOG.warn("Fail to  call fsm onSnapshotLoad");
            installSnapshotDone.run(new Status(RaftError.EHOSTDOWN, "This raft node is down"));
        }
    }
    

    在进行各种校验之后会调用到状态机的onSnapshotLoad方法,执行快照安装

    FSMCallerImpl#onSnapshotLoad

    public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
        return enqueueTask((task, sequence) -> {
            task.type = TaskType.SNAPSHOT_LOAD;
            task.done = done;
        });
    }
    

    onSnapshotLoad方法会发送一个状态为TaskType.SNAPSHOT_LOAD任务到Disruptor队列中,然后会ApplyTaskHandler中处理,最后调用到doSnapshotLoad方法中进行处理。

    FSMCallerImpl#doSnapshotLoad

    private void doSnapshotLoad(final LoadSnapshotClosure done) {
      	 ....//省略
        if (!this.fsm.onSnapshotLoad(reader)) {
            done.run(new Status(-1, "StateMachine onSnapshotLoad failed"));
            final RaftException e = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE,
                RaftError.ESTATEMACHINE, "StateMachine onSnapshotLoad failed");
            setError(e);
            return;
        }
         ....//省略
        done.run(Status.OK());
    }
    

    doSnapshotLoad方法最后调用到状态机的实现的onSnapshotLoad方法上,我们这里以CounterStateMachine为例:

    CounterStateMachine#onSnapshotLoad

    public boolean onSnapshotLoad(final SnapshotReader reader) {
        if (isLeader()) {
            LOG.warn("Leader is not supposed to load snapshot");
            return false;
        }
        if (reader.getFileMeta("data") == null) {
            LOG.error("Fail to find data file in {}", reader.getPath());
            return false;
        }
        final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
        try {
            this.value.set(snapshot.load());
            return true;
        } catch (final IOException e) {
            LOG.error("Fail to load snapshot from {}", snapshot.getPath());
            return false;
        }
    
    }
    

    onSnapshotLoad方法会将文件内容加载出来然后将值设置到value中,这就表示数据加载完毕了。

  • 相关阅读:
    (转)Python中的__init__和__new__
    PEP8
    python lstrip()函数
    python中的生成器跟迭代器
    callback
    关于0.0.0.0这个ip的疑问
    Python import中相对路径的问题
    python读取excel
    git本地管理多个密钥/账户
    词法分析之有确定、不确定自动机及其生成器
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/12115353.html
Copyright © 2011-2022 走看看