zoukankan      html  css  js  c++  java
  • dledger 的 preferredLeader

    dledger 有个 preferredLeader 的设置,它的作用是,优先选择某个节点作为 leader,具体怎么实现的?

    首先某个节点配置了 --preferred-leader-id 参数(可以在节点启动后通过命令行设置),并且当它取得 leader 地位后,它会把 leader 地位移交给优选的节点

    DLedgerServer 的定时任务

    // io.openmessaging.storage.dledger.DLedgerServer#startup
    public void startup() {
        // 存储
        this.dLedgerStore.startup();
        // 网络
        this.dLedgerRpcService.startup();
        // 日志追加 push 给 follower
        this.dLedgerEntryPusher.startup();
        // 选举
        this.dLedgerLeaderElector.startup();
        // 定期检查 preferredLeader,只有在自己是 leader 时,可以把 leader 转移给指定的节点
        executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);
    }
    // io.openmessaging.storage.dledger.DLedgerServer#checkPreferredLeader
    private void checkPreferredLeader() {
        // 当前节点不是 leader,返回
        if (!memberState.isLeader()) {
            return;
        }
        String preferredLeaderId = dLedgerConfig.getPreferredLeaderId();
        // 没有配置优选 leader,或者当前 leader 已经是优选 leader
        if (preferredLeaderId == null || preferredLeaderId.equals(memberState.getLeaderId())) {
            return;
        }
    
        // 配置的优选 leader 不在 peers 中
        if (!memberState.isPeerMember(preferredLeaderId)) {
            logger.warn("preferredLeaderId = {} is not a peer member", preferredLeaderId);
            return;
        }
    
        // 已经设置了接收 leader 地位的节点,返回
        if (memberState.getTransferee() != null) {
            return;
        }
    
        // 优选 leader 不活跃
        if (!memberState.getPeersLiveTable().containsKey(preferredLeaderId) ||
            memberState.getPeersLiveTable().get(preferredLeaderId) == Boolean.FALSE) {
            logger.warn("preferredLeaderId = {} is not online", preferredLeaderId);
            return;
        }
    
        // 如果优选 leader 的日志落后太多,返回
        long fallBehind = dLedgerStore.getLedgerEndIndex() - dLedgerEntryPusher.getPeerWaterMark(memberState.currTerm(), preferredLeaderId);
        logger.info("transferee fall behind index : {}", fallBehind);
        if (fallBehind < dLedgerConfig.getMaxLeadershipTransferWaitIndex()) {
            LeadershipTransferRequest request = new LeadershipTransferRequest();
            request.setTerm(memberState.currTerm());
            request.setTransfereeId(dLedgerConfig.getPreferredLeaderId());
    
            try {
                long startTransferTime = System.currentTimeMillis();
                // leader 开始构造请求
                LeadershipTransferResponse response = dLedgerLeaderElector.handleLeadershipTransfer(request).get();
                logger.info("transfer finished. request={},response={},cost={}ms", request, response, DLedgerUtils.elapsed(startTransferTime));
            } catch (Throwable t) {
                logger.error("[checkPreferredLeader] error, request={}", request, t);
            }
        }
    }
    
    
    // io.openmessaging.storage.dledger.DLedgerLeaderElector#handleLeadershipTransfer
    public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
        LeadershipTransferRequest request) throws Exception {
        logger.info("handleLeadershipTransfer: {}", request);
        synchronized (memberState) {
            if (memberState.currTerm() != request.getTerm()) {
                logger.warn("[BUG] [HandleLeaderTransfer] currTerm={} != request.term={}", memberState.currTerm(), request.getTerm());
                return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.INCONSISTENT_TERM.getCode()));
            }
    
            if (!memberState.isLeader()) {
                logger.warn("[BUG] [HandleLeaderTransfer] selfId={} is not leader", request.getLeaderId());
                return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.NOT_LEADER.getCode()));
            }
    
            if (memberState.getTransferee() != null) {
                logger.warn("[BUG] [HandleLeaderTransfer] transferee={} is already set", memberState.getTransferee());
                return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.LEADER_TRANSFERRING.getCode()));
            }
    
            memberState.setTransferee(request.getTransfereeId());
        }
        LeadershipTransferRequest takeLeadershipRequest = new LeadershipTransferRequest();
        takeLeadershipRequest.setGroup(memberState.getGroup());
        takeLeadershipRequest.setLeaderId(memberState.getLeaderId());
        takeLeadershipRequest.setLocalId(memberState.getSelfId());
        takeLeadershipRequest.setRemoteId(request.getTransfereeId());
        takeLeadershipRequest.setTerm(request.getTerm());
        takeLeadershipRequest.setTakeLeadershipLedgerIndex(memberState.getLedgerEndIndex());
        takeLeadershipRequest.setTransferId(memberState.getSelfId());
        takeLeadershipRequest.setTransfereeId(request.getTransfereeId());
        if (memberState.currTerm() != request.getTerm()) {
            logger.warn("[HandleLeaderTransfer] term changed, cur={} , request={}", memberState.currTerm(), request.getTerm());
            return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
        }
    
        // 当 CompletableFutrue 完成后,执行 thenApply 中的函数
        // function 有返回值
        // consumer 只消费,即没有返回值
        return dLedgerRpcService.leadershipTransfer(takeLeadershipRequest).thenApply(response -> {
            synchronized (memberState) {
                if (memberState.currTerm() == request.getTerm() && memberState.getTransferee() != null) {
                    logger.warn("leadershipTransfer failed, set transferee to null");
                    memberState.setTransferee(null);
                }
            }
            return response;
        });
    }

    优选节点准备接管 leader

    // io.openmessaging.storage.dledger.DLedgerServer#handleLeadershipTransfer
    public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(LeadershipTransferRequest request) throws Exception {
        try {
            PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
            PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
            if (memberState.getSelfId().equals(request.getTransferId())) {
                //It's the leader received the transfer command.
                PreConditions.check(memberState.isPeerMember(request.getTransfereeId()), DLedgerResponseCode.UNKNOWN_MEMBER, "transferee=%s is not a peer member", request.getTransfereeId());
                PreConditions.check(memberState.currTerm() == request.getTerm(), DLedgerResponseCode.INCONSISTENT_TERM, "currTerm(%s) != request.term(%s)", memberState.currTerm(), request.getTerm());
                PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, "selfId=%s is not leader=%s", memberState.getSelfId(), memberState.getLeaderId());
    
                // check fall transferee not fall behind much.
                long transfereeFallBehind = dLedgerStore.getLedgerEndIndex() - dLedgerEntryPusher.getPeerWaterMark(request.getTerm(), request.getTransfereeId());
                PreConditions.check(transfereeFallBehind < dLedgerConfig.getMaxLeadershipTransferWaitIndex(),
                    DLedgerResponseCode.FALL_BEHIND_TOO_MUCH, "transferee fall behind too much, diff=%s", transfereeFallBehind);
                return dLedgerLeaderElector.handleLeadershipTransfer(request);
            } else if (memberState.getSelfId().equals(request.getTransfereeId())) {
                // 优选节点收到接管 leader 的通知
                // It's the transferee received the take leadership command.
                PreConditions.check(request.getTransferId().equals(memberState.getLeaderId()), DLedgerResponseCode.INCONSISTENT_LEADER, "transfer=%s is not leader", request.getTransferId());
    
                return dLedgerLeaderElector.handleTakeLeadership(request);
            } else {
                return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
            }
        } catch (DLedgerException e) {
            logger.error("[{}][handleLeadershipTransfer] failed", memberState.getSelfId(), e);
            LeadershipTransferResponse response = new LeadershipTransferResponse();
            response.copyBaseInfo(request);
            response.setCode(e.getCode().getCode());
            response.setLeaderId(memberState.getLeaderId());
            return CompletableFuture.completedFuture(response);
        }
    
    }
    
    follower 切换为 candidate 开始拉票
    // io.openmessaging.storage.dledger.DLedgerLeaderElector#handleTakeLeadership
    public CompletableFuture<LeadershipTransferResponse> handleTakeLeadership(
        LeadershipTransferRequest request) throws Exception {
        logger.debug("handleTakeLeadership.request={}", request);
        synchronized (memberState) {
            if (memberState.currTerm() != request.getTerm()) {
                logger.warn("[BUG] [handleTakeLeadership] currTerm={} != request.term={}", memberState.currTerm(), request.getTerm());
                return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.INCONSISTENT_TERM.getCode()));
            }
    
            // 任期加一
            long targetTerm = request.getTerm() + 1;
            memberState.setTermToTakeLeadership(targetTerm);
            CompletableFuture<LeadershipTransferResponse> responseFuture = new CompletableFuture<>();
            // 更新请求响应对
            takeLeadershipTask.update(request, responseFuture);
            // 通常是 follower 转换为 candidate,开始拉票
            changeRoleToCandidate(targetTerm);
            needIncreaseTermImmediately = true;
            return responseFuture;
        }
    }

    命令行:

    leadershipTransfer --group zhang --peers n0-localhost:10911;n1-localhost:10912 --leader n1 --term 22 --transfereeId n0

    leader 日志:

    INFO  DLedgerLeaderElector:593 - handleLeadershipTransfer: LeadershipTransferRequest{transferId='n0', transfereeId='n1', takeLeadershipLedgerIndex=0, group='zhang', remoteId='n0', localId='null', code=200, leaderId='null', term=22}
    INFO  DLedgerLeaderElector:169 - [n0] [ChangeRoleToCandidate] from term: 23 and currTerm: 22
    INFO  DLedgerLeaderElector:426 - n0_[INCREASE_TERM] from 22 to 23
    INFO  DLedgerRpcNettyService:360 - LEADERSHIP_TRANSFER FINISHED. Request=RemotingCommand [code=51005, language=JAVA, version=0, opaque=1, flag(B)=0, remark=null, extFields=null, serializeTypeCurrentRPC=JSON], response=LeadershipTransferResponse{group='null', remoteId='null', localId='null', code=200, leaderId='null', term=23}, cost=119ms
    INFO  DLedgerLeaderElector:184 - [n0][ChangeRoleToFollower] from term: 23 leaderId: n1 and currTerm: 23

    优选节点日志:

    INFO  DLedgerLeaderElector:169 - [n1] [ChangeRoleToCandidate] from term: 23 and currTerm: 22
    INFO  DLedgerLeaderElector:403 - n1_[INCREASE_TERM] from 22 to 23
    INFO  DLedgerLeaderElector:434 - [n1][GetVoteResponse] {"code":200,"group":"zhang","leaderId":"n1","remoteId":"n1","term":23,"voteResult":"ACCEPT"}
    INFO  DLedgerLeaderElector:434 - [n1][GetVoteResponse] {"code":200,"group":"zhang","leaderId":"n1","remoteId":"n0","term":22,"voteResult":"REJECT_TERM_NOT_READY"}
    INFO  DLedgerLeaderElector:512 - [n1] [PARSE_VOTE_RESULT] cost=15 term=23 memberNum=2 allNum=2 acceptedNum=1 notReadyTermNum=1 biggerLedgerNum=0 alreadyHasLeader=false maxTerm=-1 result=REVOTE_IMMEDIATELY
    INFO  DLedgerLeaderElector:434 - [n1][GetVoteResponse] {"code":200,"group":"zhang","leaderId":"n1","remoteId":"n1","term":23,"voteResult":"ACCEPT"}
    INFO  DLedgerLeaderElector:434 - [n1][GetVoteResponse] {"code":200,"group":"zhang","leaderId":"n1","remoteId":"n0","term":23,"voteResult":"ACCEPT"}
    INFO  DLedgerLeaderElector:512 - [n1] [PARSE_VOTE_RESULT] cost=5 term=23 memberNum=2 allNum=2 acceptedNum=2 notReadyTermNum=0 biggerLedgerNum=0 alreadyHasLeader=false maxTerm=-1 result=PASSED
    INFO  DLedgerLeaderElector:516 - [n1] [VOTE_RESULT] has been elected to be the leader in term 23
    INFO  DLedgerLeaderElector:670 - TakeLeadershipTask finished. request=LeadershipTransferRequest{transferId='n0', transfereeId='n1', takeLeadershipLedgerIndex=8, group='zhang', remoteId='n1', localId='n0', code=200, leaderId='n0', term=22}, response=LeadershipTransferResponse{group='null', remoteId='null', localId='null', code=200, leaderId='null', term=23}, term=23, role=LEADER
    INFO  DLedgerLeaderElector:157 - [n1] [ChangeRoleToLeader] from term: 23 and currTerm: 23
    INFO  DLedgerEntryPusher:115 - Initialize the pending append map in QuorumAckChecker for term=23
    INFO  DLedgerRpcNettyService:351 - LEADERSHIP_TRANSFER FINISHED. Request=RemotingCommand [code=51005, language=JAVA, version=0, opaque=687, flag(B)=0, remark=null, extFields=null, serializeTypeCurrentRPC=JSON], response=LeadershipTransferResponse{group='null', remoteId='null', localId='null', code=200, leaderId='null', term=23}, cost=107ms
    INFO  DLedgerEntryPusher:104 - Initialize the watermark in QuorumAckChecker for term=23

    如上是最简单的一种情况,总结一下步骤:

    1. leader 通知优选节点开始接管
    2. 优选节点增加任期,切换为 candidate,开始拉票
      2.1 首先优选节点自己给自己投票
      2.2 优选节点向 leader 拉票,leader 发现它的任期比自己大,首先增加自己的任期,返回一个响应,拒绝这次拉票请求
    3. 优选节点,收到 2 个拉票响应,一个是自己,一个 REJECT_TERM_NOT_READY,于是马上再次拉票,抢在原来的 leader 前面

    如果优选节点日志落后于 leader,它是不一定能成为 leader 的,需要多轮选举才能决出 leader,但胜出的不一定是优选 leader

    根据代码来看,建议在 dledger 写入压力小的时候,切换 leader。

  • 相关阅读:
    leetcode刷题笔记303题 区域和检索
    leetcode刷题笔记301题 删除无效的括号
    20201208日报
    20201118日报
    20201117日报
    20201116日报
    20201115日报
    20201114日报
    20201113日报
    20201112日报
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12505335.html
Copyright © 2011-2022 走看看