zoukankan      html  css  js  c++  java
  • Nacos 集群选举原理

    集群选举问题:

      Nacos支持集群模式,很显然。而一旦涉及到集群,就涉及到主从,那么nacos是一种什么样的机制来实现的集群呢?

      Nacos的集群类似于zookeeper, 它分为leader角色和follower角色, 那么从这个角色的名字可以看出来,这个集群存在选举的机制。 因为如果自己不具备选举功能,角色的命名可能就是master/slave了.

    选举算法 :

      Nacos集群采用 raft 算法来实现,它是相对zookeeper的选举算法较为简单的一种。选举算法的核心在 RaftCore 中,包括数据的处理和数据同步。

      raft 算法演示地址 :http://thesecretlivesofdata.com/raft/

      在Raft中,节点有三种角色:

    1. Leader:负责接收客户端的请求
    2. Candidate:用于选举Leader的一种角色(竞选状态)
    3. Follower:负责响应来自Leader或者Candidate的请求

      选举分为两个节点

    1. 服务启动的时候
    2. leader挂了的时候

      所有节点启动的时候,都是follower状态。 如果在一段时间内如果没有收到leader的心跳(可能是没有leader,也可能是leader挂了),那么follower会变成Candidate。然后发起选举,选举之前,会增加 term,这个 term 和 zookeeper 中的 epoch 的道理是一样的。

      follower会投自己一票,并且给其他节点发送票据vote,等到其他节点回复在这个过程中,可能出现几种情况

    1. 收到过半的票数通过,则成为leader
    2. 被告知其他节点已经成为leader,则自己切换为follower
    3. 一段时间内没有收到过半的投票,则重新发起选举

      约束条件在任一term中,单个节点最多只能投一票

    选举的几种情况 :

    1. 第一种情况,赢得选举之后,leader会给所有节点发送消息,避免其他节点触发新的选举

    2. 第二种情况,比如有三个节点A B C。A B同时发起选举,而A的选举消息先到达C,C给A投了一票,当B的消息到达C时,已经不能满足上面提到的约束条件,即C不会给B投票,而A和B显然都不会给对方投票。A胜出之后,会给B,C发心跳消息,节点B发现节点A的term不低于自己的term,知道有已经有Leader了,于是转换成follower

    3. 第三种情况, 没有任何节点获得majority投票,可能是平票的情况。加入总共有四个节点(A/B/C/D),Node C、Node D同时成为了candidate,但Node A投了NodeD一票,NodeB投了Node C一票,这就出现了平票 split vote的情况。这个时候大家都在等啊等,直到超时后重新发起选举。如果出现平票的情况,那么就延长了系统不可用的时间,因此raft引入了  randomizedelection timeouts来尽量避免平票情况.

    源码分析 :

      RaftCore 初始化 :Raft选举算法,是在RaftCore这个类中实现的。

    /**
    * Init raft core.
    *
    * @throws Exception any exception during init
    */
    @PostConstruct
    public void init() throws Exception {
            
            Loggers.RAFT.info("initializing Raft sub-system");
    
         //开启一个notifier监听,这个线程中会遍历listeners,根据ApplyAction执行相应的逻辑
            executor.submit(notifier);
            
            final long start = System.currentTimeMillis();
    
         //遍历/nacos/data/naming/data/文件件,也就是从磁盘中加载Datum到内存,用来做数据恢复。(数据同步采用2pc协议,leader收到请求会写写入到磁盘日志,然后再进行数据同步)
            raftStore.loadDatums(notifier, datums);
    
         //从/nacos_home/data/naming/meta.properties文件中读取term,term表示当前的时钟周期。
            setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
            
            Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
            
            while (true) {
                if (notifier.tasks.size() <= 0) {
                    break;
                }
                Thread.sleep(1000L);
            }
            
            initialized = true;
            
            Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
            //开启定时任务,每500ms执行一次,用来判断是否需要发起leader选举,每500ms发起一次心跳
            GlobalExecutor.registerMasterElection(new MasterElection());
            GlobalExecutor.registerHeartbeat(new HeartBeat());
            
            Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
                    GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
    }

      这里我们重点关注 MasterElection 选举:

    public class MasterElection implements Runnable {
            
            @Override
            public void run() {
                try {
                    //判断本机是否具准备就绪
                    if (!peers.isReady()) {
                        return;
                    }
                    //获取本机的节点信息
                    RaftPeer local = peers.local();
    
              //leader选举触发间隔时间,第一次进来,会生成(0~15000毫秒)之间的一个随机数-500.
              // //后面由于500ms调度一次,所以每次该线程被调起,会将该leaderDueMs减去TICK_PERIOD_MS(500ms),直到小于0的时候会触发选举
              //后面每次收到一次leader的心跳就会重置leaderDueMs = 15s+(随机0-5s)
                    local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                    //当间隔时间>0,直接返回,等到下一次500ms后再调用
                    if (local.leaderDueMs > 0) {
                        return;
                    }
                    //重新设置本地的leaderDueMs
                    // reset timeout
                    local.resetLeaderDue();
                    local.resetHeartbeatDue();//设置心跳间隔5s
                    //发起投票
                    sendVote();
                } catch (Exception e) {
                    Loggers.RAFT.warn("[RAFT] error while master election {}", e);
                }
                
            }
            //发送票据数据
            private void sendVote() {
                
                RaftPeer local = peers.get(NetUtils.localServer());
                Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
                        local.term);
                //重置peer
                peers.reset();
                //每一次投票,都累加一次term,表示当前投票的轮数
                local.term.incrementAndGet();
           //选自己,此时peers中有一个votefor就是自己
                local.voteFor = local.ip;
           //本地server状态设置为CANDIDATE
                local.state = RaftPeer.State.CANDIDATE;
                
                Map<String, String> params = new HashMap<>(1);
                params.put("vote", JacksonUtils.toJson(local));//设置参数
            //遍历除了本机ip之外的其他节点,把自己的票据发送给所有节点
    
                for (final String server : peers.allServersWithoutMySelf()) {
                    final String url = buildUrl(server, API_VOTE);//API_VOTE接口路径:/raft/vote
                    try {
                        HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
                            @Override
                            public Integer onCompleted(Response response) throws Exception {
                                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                    Loggers.RAFT
                                            .error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
                                    return 1;
                                }
                                //获取其他server的响应
                                RaftPeer peer = JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class);
                                
                                Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
                                //计算leader
                                peers.decideLeader(peer);
                                
                                return 0;
                            }
                        });
                    } catch (Exception e) {
                        Loggers.RAFT.warn("error while sending vote to server: {}", server);
                    }
                }
            }
        }

      RaftController : 我们先来看一下,其他节点收到投票请求后,如何处理呢?在没有看代码之前,不难猜测到,他应该要做票据的判断,到底是不是赞同你作为leader。

    @PostMapping("/vote")
    public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
      //接受到投票请求
      RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class));
      //返回结果
      return JacksonUtils.transferToJsonNode(peer);
    }

      处理逻辑非常简单。

    • 判断收到的请求的term是不是过期的数据,如果是,则认为对方的这个票据无效,直接告诉发送这个票据的节点,你应该选择当前收到请求的节点。

    • 否则,当前收到请求的节点会自动接受对方的票据,并把自己设置成follower

    public synchronized RaftPeer receivedVote(RaftPeer remote) {
            if (!peers.contains(remote)) {
                throw new IllegalStateException("can not find peer: " + remote.ip);
            }
            //得到本机节点信息
            RaftPeer local = peers.get(NetUtils.localServer());
        //判断周期是否过期,如果收到的票据是过期状态
        if (remote.term.get() <= local.term.get()) {
                String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
                
                Loggers.RAFT.info(msg);
          //如果voteFor为空,表示在此之前没有收到其他节点的票据。则把remote节点的票据设置到自己的节点上
                if (StringUtils.isEmpty(local.voteFor)) {
                    local.voteFor = local.ip;
                }
                
                return local;
            }
            //如果上面if不成立,说明remote机器率先发起的投票,那么就认同他的投票
         //重置选举间隔时间
    
            local.resetLeaderDue();
            //设置为follower
            local.state = RaftPeer.State.FOLLOWER;
            local.voteFor = remote.ip;
            local.term.set(remote.term.get());//同步term
            
            Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
            
            return local;
        }

      peers.decideLeader(peer) 表示用来决策谁能成为leader

    public RaftPeer decideLeader(RaftPeer candidate) {
        //假设3个节点:A,B,C。local节点为A,假设A,B,C第一轮同时发起选举请求
        //第一轮:处理B,C节点返回结果:peers{"ip_a":"candidate_a","ip_b":"candidate_b","ip_C":"candidate_C"}
            peers.put(candidate.ip, candidate);
            
            SortedBag ips = new TreeBag();
            int maxApproveCount = 0;
            String maxApprovePeer = null;
         /**第一轮投票结果:
          * 第一次for循环是a自己的投票:
          * maxApproveCount = 1,maxApprovePeer = A
        * 第二次for循环是B服务器返回的投票,该投票投向B:数据同步addInstance
         * 比如我们在注册服务时,调用addInstance之后,最后会调用 consistencyService.put(key,instances); 这个方法,来实现数据一致性的同步。      * if (ips.getCount(peer.voteFor) > maxApproveCount) 条件不成立,maxApproveCount = 1,maxApprovePeer = A     *      * 第三次for循环是C服务器返回的投票,该投票投向C:      * if (ips.getCount(peer.voteFor) > maxApproveCount) 条件不成立,maxApproveCount = 1,maxApprovePeer = A     
    */     for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } }     //majorityCount():2(假设3个节点)     //第一轮:maxApproveCount = 1 if条件不成立,返回leader,此时leader为null,没有选举成功     if (maxApproveCount >= majorityCount()) { //找到得票最多的那个peer RaftPeer peer = peers.get(maxApprovePeer); //设置这个peer为leader peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local())); Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; }

       通过上面的分析想必大家应该很清楚 Nacos 的集群选举实现了 。如果还有不明白的小伙伴可以对着 Raft 的演示地址进行理解。

    数据同步:

      在注册服务时,调用addInstance之后,最后会调用 consistencyService.put(key,instances); 这个方法,来实现数据一致性的同步。

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
                throws NacosException {
            
            String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
            
            Service service = getService(namespaceId, serviceName);
            
            synchronized (service) {
                List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
                
                Instances instances = new Instances();
                instances.setInstanceList(instanceList);
                
                consistencyService.put(key, instances);
            }
        }

       调用 consistencyService.put 用来发布类容,也就是实现数据的一致性同步。

    @Override
    public void put(String key, Record value) throws NacosException {
            mapConsistencyService(key).put(key, value);
    }
    private ConsistencyService mapConsistencyService(String key) {
            return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

       这里会走后面 persistentConsistencyService ,由于  public class RaftConsistencyServiceImpl implements PersistentConsistencyService 所以这里走 RaftConsistencyServiceImpl  :

    @Override
    public void put(String key, Record value) throws NacosException {
            try {
                raftCore.signalPublish(key, value);
            } catch (Exception e) {
                Loggers.RAFT.error("Raft put failed.", e);
                throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
                        e);
            }
    }
    public void signalPublish(String key, Record value) throws Exception {
        //如果自己不是leader,则找到leader节点,把数据转发到leader节点
        if (!isLeader()) {
            ObjectNode params = JacksonUtils.createEmptyJsonNode();
            params.put("key", key);
            params.replace("value", JacksonUtils.transferToJsonNode(value));
            Map<String, String> parameters = new HashMap<>(1);
            parameters.put("key", key);
            
            final RaftPeer leader = getLeader();
            //代理转发
            raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
            return;
        }
        //如果自己是leader,则向所有节点发送onPublish请求。这个所有节点包含自己。
        try {
            OPERATE_LOCK.lock();//先上个锁
            final long start = System.currentTimeMillis();
            final Datum datum = new Datum();
            datum.key = key;
            datum.value = value;
            if (getDatum(key) == null) {
                datum.timestamp.set(1L);
            } else {
                datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
            }
            
            ObjectNode json = JacksonUtils.createEmptyJsonNode();
            json.replace("datum", JacksonUtils.transferToJsonNode(datum));
            json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
            //onPublish可以当做是一次心跳了,更新选举检查时间,然后一个重点就是term增加100了
            //当然还是就是更新内容了,先写文件,再更新内存缓存。(也就是先记录本地日志)
            onPublish(datum, peers.local());//发送数据到所有节点
            
            final String content = json.toString();
            
            final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
            //遍历所有节点,发送事务提交请求,把记录在本地日志中的数据进行提交
            for (final String server : peers.allServersIncludeMyself()) {
                if (isLeader(server)) {//再次判断是否leader
                    latch.countDown();
                    continue;
                }//构建发布同步接口地址
                final String url = buildUrl(server, API_ON_PUB);
                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
                        new AsyncCompletionHandler<Integer>() {
                            @Override
                            public Integer onCompleted(Response response) throws Exception {
                                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                    Loggers.RAFT
                                            .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                                    datum.key, server, response.getStatusCode());
                                    return 1;
                                }
                                latch.countDown();
                                return 0;
                            }
                            
                            @Override
                            public STATE onContentWriteCompleted() {
                                return STATE.CONTINUE;
                            }
                        });
                
            }
            // .....
    }

      其中  onPublish(datum, peers.local()); 发送数据到所有节点:

    public void onPublish(Datum datum, RaftPeer source) throws Exception {
            RaftPeer local = peers.local(); // 本地数据信息
            if (datum.value == null) { //这个value就是服务注册的服务,为空报错
                Loggers.RAFT.warn("received empty datum");
                throw new IllegalStateException("received empty datum");
            }
            // 判断是否事leader 不是报错
            if (!peers.isLeader(source.ip)) {
                Loggers.RAFT
                        .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                                JacksonUtils.toJson(getLeader()));
                throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
            }
            // 判断 term
            if (source.term.get() < local.term.get()) {
                Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                        JacksonUtils.toJson(local));
                throw new IllegalStateException(
                        "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
            }
            //重置事件
            local.resetLeaderDue();
            
            // if data should be persisted, usually this is true:
            if (KeyBuilder.matchPersistentKey(datum.key)) {
                raftStore.write(datum);// 发送持久化数据,完成数据同步
            }
            
            datums.put(datum.key, datum);
            
            if (isLeader()) {//如果是leader ,则增加 term
                local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
            } else {
                if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
                    //set leader term:
                    getLeader().term.set(source.term.get());
                    local.term.set(getLeader().term.get());
                } else {
                    local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
                }
            }
          //更新/nacos_home/data/naming/meta.properties文件
            raftStore.updateTerm(local.term.get());
            
            notifier.addTask(datum.key, DataOperation.CHANGE);
            
            Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
        }

      通过以上的操作就完成了数据的同步。

  • 相关阅读:
    MVC3、如何应用EntityFramework 连接MySql 数据库 Kevin
    DEV EXPRESS Summary Footer 不显示 Kevin
    装饰模式 Kevin
    Dev 控件 GridControl 控件 二次绑定数据源的问题。 Kevin
    System.InvalidOperationException 异常 Kevin
    LINQ to XML Kevin
    代理模式——代码版“吊丝的故事” Kevin
    VS2012 中的设备 面板 Kevin
    maven 学习笔记(三)创建一个较复杂的 eclipse+android+maven 工程
    maven 学习笔记(一)eclipse+android+maven
  • 原文地址:https://www.cnblogs.com/wuzhenzhao/p/13641155.html
Copyright © 2011-2022 走看看