zoukankan      html  css  js  c++  java
  • 【Nacos】数据一致性

    转自:https://blog.csdn.net/liyanan21/article/details/89320872

    目录

    一、Raft算法

    二、Nacos中Raft部分源码

    init()

    1. 获取Raft集群节点 

    NamingProxy.getServers()获取集群节点

    NamingProxy.refreshSrvIfNeed()得到节点信息

    NamingProxy.refreshServerListFromDisk()获取集群节点信息

    2. Raft集群数据恢复

    RaftStore.load()

    3. Raft选举

    GlobalExecutor.register(new MasterElection())注册选举定时任务

    MasterElection.sendVote()发送定时任务

    (1)RaftCommands.vote()处理/v1/ns/raft/vote请求

    (2)PeerSet.decideLeader()选举

    4. Raft心跳

    GlobalExecutor.register(new HeartBeat())注册心跳定时任务

    HeartBeat.sendBeat()发送心跳包

    (·)RaftCommands.beat()方法处理/v1/ns/raft/beat请求

    5. Raft发布内容

    注册入口

    实例信息持久化

    (1)Service.put()

    (2)RaftCore.signalPublish()

    (3)/raft/datum 接口 和 /raft/datum/commit 接口

    发布入口 RaftCommands.publish()

    6. Raft保证内容一致性


    一、Raft算法

    Raft通过当选的领导者达成共识。筏集群中的服务器是领导者或追随者,并且在选举的精确情况下可以是候选者(领导者不可用)。领导者负责将日志复制到关注者。它通过发送心跳消息定期通知追随者它的存在。每个跟随者都有一个超时(通常在150到300毫秒之间),它期望领导者的心跳。接收心跳时重置超时。如果没有收到心跳,则关注者将其状态更改为候选人并开始领导选举。

    详见:Raft算法

    二、Nacos中Raft部分源码

    Nacos server在启动时,会通过RunningConfig.onApplicationEvent()方法调用RaftCore.init()方法。

    init()

    public static void init() throws Exception {
     
        Loggers.RAFT.info("initializing Raft sub-system");
     
        // 启动Notifier,轮询Datums,通知RaftListener
        executor.submit(notifier);
         
        // 获取Raft集群节点,更新到PeerSet中
        peers.add(NamingProxy.getServers());
     
        long start = System.currentTimeMillis();
     
        // 从磁盘加载Datum和term数据进行数据恢复
        RaftStore.load();
     
        Loggers.RAFT.info("cache loaded, peer count: {}, datum count: {}, current term: {}",
            peers.size(), datums.size(), peers.getTerm());
     
        while (true) {
            if (notifier.tasks.size() <= 0) {
                break;
            }
            Thread.sleep(1000L);
            System.out.println(notifier.tasks.size());
        }
     
        Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
     
        GlobalExecutor.register(new MasterElection()); // Leader选举
        GlobalExecutor.register1(new HeartBeat()); // Raft心跳
        GlobalExecutor.register(new AddressServerUpdater(), GlobalExecutor.ADDRESS_SERVER_UPDATE_INTERVAL_MS);
     
        if (peers.size() > 0) {
            if (lock.tryLock(INIT_LOCK_TIME_SECONDS, TimeUnit.SECONDS)) {
                initialized = true;
                lock.unlock();
            }
        } else {
            throw new Exception("peers is empty.");
        }
     
        Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
            GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
    }

    在init方法主要做了如下几件事:

    • 1. 获取Raft集群节点 peers.add(NamingProxy.getServers());
    • 2. Raft集群数据恢复 RaftStore.load();
    • 3. Raft选举 GlobalExecutor.register(new MasterElection()); 
    • 4. Raft心跳 GlobalExecutor.register(new HeartBeat()); 
    • 5. Raft发布内容
    • 6. Raft保证内容一致性

    1. 获取Raft集群节点 

    NamingProxy.getServers()获取集群节点

    • NamingProxy.refreshSrvIfNeed()得到节点信息
    • 返回List<String> servers

    NamingProxy.refreshSrvIfNeed()得到节点信息

    • 如果单机模式

      则本主机的ip:port为Raft节点信息;

      否则

      调用下面的NamingProxy.refreshServerListFromDisk()获取Raft集群节点信息

    • 获取到Raft集群节点信息之后(即ip:port列表),更新NamingProxy的List<String> serverlistFromConfig属性和List<String> servers属性。

    NamingProxy.refreshServerListFromDisk()获取集群节点信息

    从磁盘或系统环境变量种读取Raft集群节点信息,即ip:port列表

    2. Raft集群数据恢复

    Nacos启动/重启时会从磁盘加载Datum和term数据进行数据恢复。

    nacos server端启动后->RaftCore.init()方法->RaftStore.load()方法。

    RaftStore.load()

    • 从磁盘获取Datum数据:

      将Datum放到RaftCore的ConcurrentMap<String, Datum> datums集合中,key为Datum的key;

      将Datum和ApplyAction.CHANGE封装成Pair放到Notifier的tasks队列中,通知相关的RaftListener;

    • 从META_FILE_NAME:<user.home> acos aftmeta.properties获取任期term值(long值):

      调用RaftSet.setTerm(long term)方法更新Raft集群中每个节点的term值

    3. Raft选举

    GlobalExecutor.register(new MasterElection())注册选举定时任务

    Nacos的Raft选举是通过MasterElection这个线程任务完成的。

    • 更新候选节点的election timeout、heart timeout。
    • 调用MasterElection.sendVote()进行投票。
    public class MasterElection implements Runnable {
        @Override
        public void run() {
            try {
                if (!peers.isReady()) {
                    return;
                }
     
                RaftPeer local = peers.local();
                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.leaderDueMs > 0) {
                    return;
                }
     
                // 重置选举超时时间,每次心跳以及收到数据包都会重置
                local.resetLeaderDue();
                local.resetHeartbeatDue();
     
                // 发起选举
                sendVote();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while master election {}", e);
            }
        }
    }

    MasterElection.sendVote()发送定时任务

    • 重置Raft集群数据:

    leader置为null; 所有Raft节点的voteFor字段置为null;

    • 更新候选节点数据:

    任期term自增1;(通过自增1制造和其它节点的term差异,避免所有节点term一样选举不出Leader)

    候选节点的voteFor字段设置为自己;

    state置为CANDIDATE;

    • 候选节点向除自身之外的所有其它Raft节点的/v1/ns/raft/vote发送HTTP POST请求:

    请求内容为vote:JSON.toJSONString(local)

    • 候选节点收到其他节点投的候选节点数据,交给PeerSet.decideLeader()方法处理

    把超半数的voteFor对应的RaftPerr设置为Leader。

            public void sendVote() {
    
                RaftPeer local = peers.get(NetUtils.localServer());
                Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",
                    JSON.toJSONString(getLeader()), local.term);
    
                //重置Raft集群数据
                peers.reset();
    
                //更新候选节点数据
                local.term.incrementAndGet();
                local.voteFor = local.ip;
                local.state = RaftPeer.State.CANDIDATE;
    
    
                //候选节点向除自身之外的所有其它Raft节点的/v1/ns/raft/vote发送HTTP POST请求
                //请求内容为vote:JSON.toJSONString(local)
                Map<String, String> params = new HashMap<String, String>(1);
                params.put("vote", JSON.toJSONString(local));
                for (final String server : peers.allServersWithoutMySelf()) {
                    final String url = buildURL(server, API_VOTE);
                    try {
                        HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
                            @Override
                            public Integer onCompleted(Response response) throws Exception {
                                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                    Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
                                    return 1;
                                }
    
                                RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
    
                                Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));
    
                                //候选节点收到其他节点投的候选节点数据,交给PeerSet.decideLeader
                                //方法处理
                                peers.decideLeader(peer);
    
                                return 0;
                            }
                        });
                    } catch (Exception e) {
                        Loggers.RAFT.warn("error while sending vote to server: {}", server);
                    }
                }
            }
        }
    

    (1)RaftCommands.vote()处理/v1/ns/raft/vote请求

    选举请求的 http 接口

    @RestController
    @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft")
    public class RaftController {
     
        ......
     
        @NeedAuth
        @RequestMapping(value = "/vote", method = RequestMethod.POST)
        public JSONObject vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
            // 处理选举请求
            RaftPeer peer = raftCore.receivedVote(
                JSON.parseObject(WebUtils.required(request, "vote"), RaftPeer.class));
     
            return JSON.parseObject(JSON.toJSONString(peer));
        }
     
     
        ......
    }

    调用RaftCore.MasterElection.receivedVote()方法

    如果收到的候选节点term比本地节点term要小,则:

                       本地节点的voteFor更新为自己;(意思是我自己更适合做leader,这一票我投给自己)

    否则:

                       这个Follower重置它的election timeout;

                       更新它的voteFor为收到的候选节点ip;(意思是就按你说的做,这一票就投给你了。)

                       更新它的term为收到的候选节点term;

    将本地节点作为http响应返回;

    @Component
    public class RaftCore {
     
        ......
     
        public RaftPeer receivedVote(RaftPeer remote) {
            if (!peers.contains(remote)) {
                throw new IllegalStateException("can not find peer: " + remote.ip);
            }
     
            // 若当前节点的 term 大于等于发送选举请求的节点 term,则选择自己为 leader
            RaftPeer local = peers.get(NetUtils.localServer());
            if (remote.term.get() <= local.term.get()) {
                String msg = "received illegitimate vote" +
                    ", voter-term:" + remote.term + ", votee-term:" + local.term;
     
                Loggers.RAFT.info(msg);
                if (StringUtils.isEmpty(local.voteFor)) {
                    local.voteFor = local.ip;
                }
     
                return local;
            }
     
            local.resetLeaderDue();
     
            // 若当前节点的 term 小于发送请求的节点 term,选择发送请求的节点为 leader
            local.state = RaftPeer.State.FOLLOWER;
            local.voteFor = remote.ip;
            local.term.set(remote.term.get());
     
            Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
     
            return local;
        }
    }

    (2)PeerSet.decideLeader()选举

    @Component
    @DependsOn("serverListManager")
    public class RaftPeerSet implements ServerChangeListener {
     
        ......
     
        public RaftPeer decideLeader(RaftPeer candidate) {
            peers.put(candidate.ip, candidate);
     
            SortedBag ips = new TreeBag();
            int maxApproveCount = 0;
            String maxApprovePeer = null;
            // 遍历所有的节点,若 voteFor 不为空,则将节点的 voteFor 添加到 ips 中,记录被选举次数最多的节点和次数
            for (RaftPeer peer : peers.values()) {
                if (StringUtils.isEmpty(peer.voteFor)) {
                    continue;
                }
     
                ips.add(peer.voteFor);
                if (ips.getCount(peer.voteFor) > maxApproveCount) {
                    maxApproveCount = ips.getCount(peer.voteFor);
                    maxApprovePeer = peer.voteFor;
                }
            }
     
            // 将选举出来的节点设置为 leader
            if (maxApproveCount >= majorityCount()) {
                RaftPeer peer = peers.get(maxApprovePeer);
                peer.state = RaftPeer.State.LEADER;
     
                if (!Objects.equals(leader, peer)) {
                    leader = peer;
                    Loggers.RAFT.info("{} has become the LEADER", leader.ip);
                }
            }
     
            return leader;
        }
    }

    4. Raft心跳

    GlobalExecutor.register(new HeartBeat())注册心跳定时任务

    •  重置Leader节点的heart timeout、election timeout;
    • sendBeat()发送心跳包
    public class HeartBeat implements Runnable {
        @Override
        public void run() {
            try {
                if (!peers.isReady()) {
                    return;
                }
     
                RaftPeer local = peers.local();
                // hearbeatDueMs 默认为 5s,TICK_PERIOD_MS 为 500ms,每 500ms 检查一次,每 5s 发送一次心跳
                local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.heartbeatDueMs > 0) {
                    return;
                }
     
                // 重置 heartbeatDueMs
                local.resetHeartbeatDue();
     
                // 发送心跳包
                sendBeat();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
            }
        }
    }

    HeartBeat.sendBeat()发送心跳包

    • 重置Leader节点的heart timeout、election timeout;
    • 向除自身之外的其它节点/v1/ns/raft/beat路径发送HTTP POST请求,请求内容如下:

    JSONObject packet = new JSONObject();

    packet.put("peer", local);  //local为Leader节点对应的RaftPeer对象

    packet.put("datums", array); //array中封装了RaftCore中所有的Datum的key和timestamp

    Map<String, String> params = new HashMap<String, String>(1);

    params.put("beat", JSON.toJSONString(packet));

    • 拿到各个节点返回的http响应,即RaftPeer对象,更新PeerSet的Map<String, RaftPeer> peers集合。(保持集群节点数据一致)
        public void sendBeat() throws IOException, InterruptedException {
            RaftPeer local = peers.local();
            // 只有 leader 才发送心跳
            if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
                return;
            }
     
            Loggers.RAFT.info("[RAFT] send beat with {} keys.", datums.size());
     
            // 重置收不到包就选举 leader 的时间间隔
            local.resetLeaderDue();
     
            // 构建心跳包信息,local 为当前 nacos 节点的信息,key 为 peer
            JSONObject packet = new JSONObject();
            packet.put("peer", local);
     
            JSONArray array = new JSONArray();
     
            // 只发送心跳包,不带数据过去
            if (switchDomain.isSendBeatOnly()) {
                Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
            }
     
            // 将相关的 key 通过心跳包发送给 follower
            if (!switchDomain.isSendBeatOnly()) {
                for (Datum datum : datums.values()) {
                    JSONObject element = new JSONObject();
     
                    // 将 key 和对应的版本放入 element 中,最终添加到 array 里
                    if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                        element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                    } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                        element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                    }
                    element.put("timestamp", datum.timestamp);
     
                    array.add(element);
                }
            } else {
                Loggers.RAFT.info("[RAFT] send beat only.");
            }
     
            // 将所有 key 组成的 array 放入数据包
            packet.put("datums", array);
             
            // 将数据包转换成 json 字符串放入 params 中
            Map<String, String> params = new HashMap<String, String>(1);
            params.put("beat", JSON.toJSONString(packet));
     
            String content = JSON.toJSONString(params);
     
            // 用 gzip 压缩
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            GZIPOutputStream gzip = new GZIPOutputStream(out);
            gzip.write(content.getBytes("UTF-8"));
            gzip.close();
     
            byte[] compressedBytes = out.toByteArray();
            String compressedContent = new String(compressedBytes, "UTF-8");
            Loggers.RAFT.info("raw beat data size: {}, size of compressed data: {}",
                content.length(), compressedContent.length());
     
            // 将心跳包发送给所有的 follower
            for (final String server : peers.allServersWithoutMySelf()) {
                try {
                    final String url = buildURL(server, API_BEAT);
                    Loggers.RAFT.info("send beat to server " + server);
                    HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",
                                    response.getResponseBody(), server);
                                MetricsMonitor.getLeaderSendBeatFailedException().increment();
                                return 1;
                            }
                            peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
                            Loggers.RAFT.info("receive beat response from: {}", url);
                            return 0;
                        }
     
                        @Override
                        public void onThrowable(Throwable t) {
                            Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
                            MetricsMonitor.getLeaderSendBeatFailedException().increment();
                        }
                    });
                } catch (Exception e) {
                    Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                }
            }
        }

    (·)RaftCommands.beat()方法处理/v1/ns/raft/beat请求

    接收心跳包的 http 接口:

    @RestController
    @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft")
    public class RaftController {
     
        ......
     
        @NeedAuth
        @RequestMapping(value = "/beat", method = RequestMethod.POST)
        public JSONObject beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
            String entity = new String(IoUtils.tryDecompress(request.getInputStream()), "UTF-8");
            String value = URLDecoder.decode(entity, "UTF-8");
            value = URLDecoder.decode(value, "UTF-8");
     
            // 解析心跳包
            JSONObject json = JSON.parseObject(value);
            JSONObject beat = JSON.parseObject(json.getString("beat"));
     
            // 处理心跳包并将本节点的信息作为 response 返回
            RaftPeer peer = raftCore.receivedBeat(beat);
            return JSON.parseObject(JSON.toJSONString(peer));
        }
     
        ......
    }

    HeartBeat.receivedBeat()处理心跳包

    • 如果收到心跳的节点不是Follower角色,则设置为Follower角色,并把它的voteFor设置为Leader节点的ip;
    • 重置本地节点的heart timeout、election timeout;
    • 调用PeerSet.makeLeader()通知这个节点更新Leader;(也就是说Leader节点会通过心跳通知其它节点更新Leader)
    • 检查Datum:

    遍历请求参数中的datums,如果Follwoer不存在这个datumKey或者时间戳比较旧,则收集这个datumKey;

    每收集到50个datumKey,则向Leader节点的/v1/ns/raft/get路径发送请求,请求参数为这50个datumKey,获取对应的50个最新的Datum对象;

    遍历这些Daum对象,接下来做的是和RaftCore.onPublish()方法中做的事类似:
                  1.调用RaftStore#write将Datum序列化为json写到cacheFile中
                  2.将Datum存放到RaftCore的datums集合中,key为上面的datum的key值
                  3.更新本地节点的election timeout
                  4.更新本地节点的任期term
                  5.本地节点的任期term持久化到properties文件中
                  6.调用notifier.addTask(datum, Notifier.ApplyAction.CHANGE);

    通知对应的RaftListener

    RaftCore.deleteDatum(String key)用来删除旧的Datum
                  datums集合中删除key对应的Datum;
                  RaftStore.delete(),在磁盘上删除这个Datum对应的文件;
                  notifier.addTask(deleted, Notifier.ApplyAction.DELETE),通知对应的RaftListener有DELETE事件。

    • 本地节点的RaftPeer作为http响应返回。
    @Component
    public class RaftCore {
     
        ......
     
        public RaftPeer receivedBeat(JSONObject beat) throws Exception {
            final RaftPeer local = peers.local();
            // 解析发送心跳包的节点信息
            final RaftPeer remote = new RaftPeer();
            remote.ip = beat.getJSONObject("peer").getString("ip");
            remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));
            remote.term.set(beat.getJSONObject("peer").getLongValue("term"));
            remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");
            remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");
            remote.voteFor = beat.getJSONObject("peer").getString("voteFor");
     
            // 若收到的心跳包不是 leader 节点发送的,则抛异常
            if (remote.state != RaftPeer.State.LEADER) {
                Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",
                    remote.state, JSON.toJSONString(remote));
                throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
            }
     
            // 本地 term 大于心跳包的 term,则心跳包不进行处理
            if (local.term.get() > remote.term.get()) {
                Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"
                    , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);
                throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()
                    + ", beat-to-term: " + local.term.get());
            }
     
            // 若当前节点不是 follower 节点,则将其更新为 follower 节点
            if (local.state != RaftPeer.State.FOLLOWER) {
                Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));
                // mk follower
                local.state = RaftPeer.State.FOLLOWER;
                local.voteFor = remote.ip;
            }
     
            final JSONArray beatDatums = beat.getJSONArray("datums");
            // 更新心跳包发送间隔和收不到心跳包的选举间隔
            local.resetLeaderDue();
            local.resetHeartbeatDue();
     
            // 更新 leader 信息,将 remote 设置为新 leader,更新原有 leader 的节点信息
            peers.makeLeader(remote);
     
            // 将当前节点的 key 存放到一个 map 中,value 都为 0
            Map<String, Integer> receivedKeysMap = new HashMap<String, Integer>(datums.size());
            for (Map.Entry<String, Datum> entry : datums.entrySet()) {
                receivedKeysMap.put(entry.getKey(), 0);
            }
     
            // 检查接收到的 datum 列表
            List<String> batch = new ArrayList<String>();
            if (!switchDomain.isSendBeatOnly()) {
                int processedCount = 0;
                Loggers.RAFT.info("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
                    beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
                for (Object object : beatDatums) {
                    processedCount = processedCount + 1;
     
                    JSONObject entry = (JSONObject) object;
                    String key = entry.getString("key");
                    final String datumKey;
                    // 构建 datumKey(加上前缀,发送的时候 key 是去掉了前缀的)
                    if (KeyBuilder.matchServiceMetaKey(key)) {
                        datumKey = KeyBuilder.detailServiceMetaKey(key);
                    } else if (KeyBuilder.matchInstanceListKey(key)) {
                        datumKey = KeyBuilder.detailInstanceListkey(key);
                    } else {
                        // ignore corrupted key:
                        continue;
                    }
     
                    // 获取收到的 key 对应的版本
                    long timestamp = entry.getLong("timestamp");
     
                    // 将收到的 key 在本地 key 的 map 中标记为 1
                    receivedKeysMap.put(datumKey, 1);
     
                    try {
                        // 收到的 key 在本地存在 并且 本地的版本大于收到的版本 并且 还有数据未处理,则直接 continue
                        if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
                            continue;
                        }
     
                        // 若收到的 key 在本地没有,或者本地的版本小于收到的版本,放入 batch,准备下一步获取数据
                        if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
                            batch.add(datumKey);
                        }
     
                        // 只有 batch 的数量超过 50 或已经处理完了,才进行获取数据操作
                        if (batch.size() < 50 && processedCount < beatDatums.size()) {
                            continue;
                        }
     
                        String keys = StringUtils.join(batch, ",");
     
                        if (batch.size() <= 0) {
                            continue;
                        }
     
                        Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"
                            , getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());
     
                        // 获取对应 key 的数据
                        // update datum entry
                        String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
                        HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
                            @Override
                            public Integer onCompleted(Response response) throws Exception {
                                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                    return 1;
                                }
     
                                List<Datum> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<Datum>>() {
                                });
     
                                // 更新本地数据
                                for (Datum datum : datumList) {
                                    OPERATE_LOCK.lock();
                                    try {
                                        Datum oldDatum = getDatum(datum.key);
     
                                        if (oldDatum != null && datum.timestamp.get() <= oldDatum.timestamp.get()) {
                                            Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
                                                datum.key, datum.timestamp, oldDatum.timestamp);
                                            continue;
                                        }
     
                                        raftStore.write(datum);
     
                                        if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                                            Datum<Service> serviceDatum = new Datum<>();
                                            serviceDatum.key = datum.key;
                                            serviceDatum.timestamp.set(datum.timestamp.get());
                                            serviceDatum.value = JSON.parseObject(JSON.toJSONString(datum.value), Service.class);
                                            datum = serviceDatum;
                                        }
     
                                        if (KeyBuilder.matchInstanceListKey(datum.key)) {
                                            Datum<Instances> instancesDatum = new Datum<>();
                                            instancesDatum.key = datum.key;
                                            instancesDatum.timestamp.set(datum.timestamp.get());
                                            instancesDatum.value = JSON.parseObject(JSON.toJSONString(datum.value), Instances.class);
                                            datum = instancesDatum;
                                        }
     
                                        datums.put(datum.key, datum);
                                        notifier.addTask(datum.key, ApplyAction.CHANGE);
     
                                        local.resetLeaderDue();
     
                                        if (local.term.get() + 100 > remote.term.get()) {
                                            getLeader().term.set(remote.term.get());
                                            local.term.set(getLeader().term.get());
                                        } else {
                                            local.term.addAndGet(100);
                                        }
     
                                        raftStore.updateTerm(local.term.get());
     
                                        Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
                                            datum.key, datum.timestamp, JSON.toJSONString(remote), local.term);
     
                                    } catch (Throwable e) {
                                        Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, key: {} {}", datum.key, e);
                                    } finally {
                                        OPERATE_LOCK.unlock();
                                    }
                                }
                                TimeUnit.MILLISECONDS.sleep(200);
                                return 0;
                            }
                        });
     
                        batch.clear();
                    } catch (Exception e) {
                        Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
                    }
                }
     
                // 若某个 key 在本地存在但收到的 key 列表中没有,则证明 leader 已经删除,那么本地也要删除
                List<String> deadKeys = new ArrayList<String>();
                for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
                    if (entry.getValue() == 0) {
                        deadKeys.add(entry.getKey());
                    }
                }
     
                for (String deadKey : deadKeys) {
                    try {
                        deleteDatum(deadKey);
                    } catch (Exception e) {
                        Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
                    }
                }
            }
     
            return local;
        }
    }

    5. Raft发布内容

    注册入口

    注册http接口

    @RestController
    @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
    public class InstanceController {
     
        ......
     
        @CanDistro
        @RequestMapping(value = "", method = RequestMethod.POST)
        public String register(HttpServletRequest request) throws Exception {
            // 获取 namespace 和 serviceName
            String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
     
            // 执行注册逻辑
            serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
            return "ok";
        }
    }

    注册实例

    @Component
    @DependsOn("nacosApplicationContext")
    public class ServiceManager implements RecordListener<Service> {
     
        ......
     
        private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
     
     
        ......
     
        // 注册新实例
        public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
            // 创建空 service,所有的 service 都存放在 serviceMap 中,serviceMap 类型为:Map<String, Map<String, Service>>,第一层 map 的 key 为 namespace,第二层 map 的 key 为 serviceName;
            // 每个 service 中维护一个 clusterMap,clusterMap 中有两个 set,用来存放 instance
            if (ServerMode.AP.name().equals(switchDomain.getServerMode())) {
                createEmptyService(namespaceId, serviceName);
            }
     
            Service service = getService(namespaceId, serviceName);
     
            if (service == null) {
                throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
            }
     
            // 检查实例是否已存在,通过 ip 进行比较
            if (service.allIPs().contains(instance)) {
                throw new NacosException(NacosException.INVALID_PARAM, "instance already exist: " + instance);
            }
     
            // 添加新实例
            addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
        }
     
     
        // 创建空 service
        public void createEmptyService(String namespaceId, String serviceName) throws NacosException {
            Service service = getService(namespaceId, serviceName);
            if (service == null) {
                service = new Service();
                service.setName(serviceName);
                service.setNamespaceId(namespaceId);
                service.setGroupName(Constants.DEFAULT_GROUP);
                // now validate the service. if failed, exception will be thrown
                service.setLastModifiedMillis(System.currentTimeMillis());
                service.recalculateChecksum();
                service.validate();
                putService(service);
                service.init();
                // 添加对 service 的监听,用来同步数据
                consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
                consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
            }
        }
     
        // 添加 instance 到缓存中,并且持久化
        public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
            String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
     
            Service service = getService(namespaceId, serviceName);
     
            // 添加 instance 到本地缓存
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
     
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
     
            // 将 instance 信息持久化
            consistencyService.put(key, instances);
        }
     
        // 添加实例到缓存
        public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
            return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
        }
     
        // 真正的添加实例到缓存的逻辑
        public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
            Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
     
            Map<String, Instance> oldInstanceMap = new HashMap<>(16);
            List<Instance> currentIPs = service.allIPs(ephemeral);
            Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());
     
            for (Instance instance : currentIPs) {
                map.put(instance.toIPAddr(), instance);
            }
            if (datum != null) {
                oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);
            }
     
            // use HashMap for deep copy:
            HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
            instanceMap.putAll(oldInstanceMap);
     
            for (Instance instance : ips) {
                if (!service.getClusterMap().containsKey(instance.getClusterName())) {
                    Cluster cluster = new Cluster(instance.getClusterName());
                    cluster.setService(service);
                    service.getClusterMap().put(instance.getClusterName(), cluster);
                    Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                        instance.getClusterName(), instance.toJSON());
                }
     
                if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
                    instanceMap.remove(instance.getDatumKey());
                } else {
                    instanceMap.put(instance.getDatumKey(), instance);
                }
            }
     
            if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
                throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "
                    + JSON.toJSONString(instanceMap.values()));
            }
     
            return new ArrayList<>(instanceMap.values());
        }
     
        // 将旧的 instance 列表与新的 instance 合并到一起
        private Map<String, Instance> setValid(List<Instance> oldInstances, Map<String, Instance> map) {
            Map<String, Instance> instanceMap = new HashMap<>(oldInstances.size());
            for (Instance instance : oldInstances) {
                Instance instance1 = map.get(instance.toIPAddr());
                if (instance1 != null) {
                    instance.setHealthy(instance1.isHealthy());
                    instance.setLastBeat(instance1.getLastBeat());
                }
                instanceMap.put(instance.getDatumKey(), instance);
            }
            return instanceMap;
        }
     
        ......
    }

    实例信息持久化

    RaftConsistencyServiceImpl.put() 方法用来做实例信息持久化的工作,即上面提到的consistencyService.put(key, instances);这一步

    (1)Service.put()

    @Service
    public class RaftConsistencyServiceImpl implements PersistentConsistencyService {
     
        ......
     
        @Override
        public void put(String key, Record value) throws NacosException {
            try {
                raftCore.signalPublish(key, value);
            } catch (Exception e) {
                Loggers.RAFT.error("Raft put failed.", e);
                throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value);
            }
        }
    }

    最终调用到 RaftCore 的 signalPublish() 方法:

    (2)RaftCore.signalPublish()

    @Component
    public class RaftCore {
     
        ......
     
        public void signalPublish(String key, Record value) throws Exception {
            // 若不是 leader,直接将包转发给 leader
            if (!isLeader()) {
                JSONObject params = new JSONObject();
                params.put("key", key);
                params.put("value", value);
                Map<String, String> parameters = new HashMap<>(1);
                parameters.put("key", key);
     
                // 调用 /raft/datum 接口
                raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
                return;
            }
     
            // 若是 leader,将包发送给所有的 follower
            try {
                OPERATE_LOCK.lock();
                long start = System.currentTimeMillis();
                final Datum datum = new Datum();
                datum.key = key;
                datum.value = value;
                if (getDatum(key) == null) {
                    datum.timestamp.set(1L);
                } else {
                    datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
                }
     
                JSONObject json = new JSONObject();
                json.put("datum", datum);
                json.put("source", peers.local());
     
                // 本地 onPublish 方法用来处理持久化逻辑
                onPublish(datum, peers.local());
     
                final String content = JSON.toJSONString(json);
     
                final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
                // 将包发送给所有的 follower,调用 /raft/datum/commit 接口
                for (final String server : peers.allServersIncludeMyself()) {
                    if (isLeader(server)) {
                        latch.countDown();
                        continue;
                    }
                    final String url = buildURL(server, API_ON_PUB);
                    HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                    datum.key, server, response.getStatusCode());
                                return 1;
                            }
                            latch.countDown();
                            return 0;
                        }
     
                        @Override
                        public STATE onContentWriteCompleted() {
                            return STATE.CONTINUE;
                        }
                    });
                }
     
                if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    // only majority servers return success can we consider this update success
                    Loggers.RAFT.info("data publish failed, caused failed to notify majority, key={}", key);
                    throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
                }
     
                long end = System.currentTimeMillis();
                Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
            } finally {
                OPERATE_LOCK.unlock();
            }
        }
    }

    (3)/raft/datum 接口 和 /raft/datum/commit 接口

    @RestController
    @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft")
    public class RaftController {
     
        ......
     
        @NeedAuth
        @RequestMapping(value = "/datum", method = RequestMethod.POST)
        public String publish(HttpServletRequest request, HttpServletResponse response) throws Exception {
     
            response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
            response.setHeader("Cache-Control", "no-cache");
            response.setHeader("Content-Encode", "gzip");
     
            String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
            String value = URLDecoder.decode(entity, "UTF-8");
            JSONObject json = JSON.parseObject(value);
     
            // 这里也是调用 RaftConsistencyServiceImpl.put() 进行处理,与服务注册的逻辑在此回合,最终调用到 signalPublish 方法
            String key = json.getString("key");
            if (KeyBuilder.matchInstanceListKey(key)) {
                raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), Instances.class));
                return "ok";
            }
     
            if (KeyBuilder.matchSwitchKey(key)) {
                raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), SwitchDomain.class));
                return "ok";
            }
     
            if (KeyBuilder.matchServiceMetaKey(key)) {
                raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), Service.class));
                return "ok";
            }
     
            throw new NacosException(NacosException.INVALID_PARAM, "unknown type publish key: " + key);
        }
     
     
        @NeedAuth
        @RequestMapping(value = "/datum/commit", method = RequestMethod.POST)
        public String onPublish(HttpServletRequest request, HttpServletResponse response) throws Exception {
            response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
            response.setHeader("Cache-Control", "no-cache");
            response.setHeader("Content-Encode", "gzip");
     
            String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
            String value = URLDecoder.decode(entity, "UTF-8");
            JSONObject jsonObject = JSON.parseObject(value);
            String key = "key";
     
            RaftPeer source = JSON.parseObject(jsonObject.getString("source"), RaftPeer.class);
            JSONObject datumJson = jsonObject.getJSONObject("datum");
     
            Datum datum = null;
            if (KeyBuilder.matchInstanceListKey(datumJson.getString(key))) {
                datum = JSON.parseObject(jsonObject.getString("datum"), new TypeReference<Datum<Instances>>() {});
            } else if (KeyBuilder.matchSwitchKey(datumJson.getString(key))) {
                datum = JSON.parseObject(jsonObject.getString("datum"), new TypeReference<Datum<SwitchDomain>>() {});
            } else if (KeyBuilder.matchServiceMetaKey(datumJson.getString(key))) {
                datum = JSON.parseObject(jsonObject.getString("datum"), new TypeReference<Datum<Service>>() {});
            }
     
            // 该方法最终调用到 onPublish 方法
            raftConsistencyService.onPut(datum, source);
            return "ok";
        }
     
        ......
    }

    发布入口 RaftCommands.publish()

    @Component
    public class RaftCore {
     
        ......
     
        public void onPublish(Datum datum, RaftPeer source) throws Exception {
            RaftPeer local = peers.local();
            if (datum.value == null) {
                Loggers.RAFT.warn("received empty datum");
                throw new IllegalStateException("received empty datum");
            }
     
            // 若该包不是 leader 发布来的,抛异常
            if (!peers.isLeader(source.ip)) {
                Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}",
                    JSON.toJSONString(source), JSON.toJSONString(getLeader()));
                throw new IllegalStateException("peer(" + source.ip + ") tried to publish " +
                    "data but wasn't leader");
            }
     
            // 来源 term 小于本地当前 term,抛异常
            if (source.term.get() < local.term.get()) {
                Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}",
                    JSON.toJSONString(source), JSON.toJSONString(local));
                throw new IllegalStateException("out of date publish, pub-term:"
                    + source.term.get() + ", cur-term: " + local.term.get());
            }
     
            // 更新选举超时时间
            local.resetLeaderDue();
     
            // 节点信息持久化
            // if data should be persistent, usually this is always true:
            if (KeyBuilder.matchPersistentKey(datum.key)) {
                raftStore.write(datum);
            }
     
            // 添加到缓存
            datums.put(datum.key, datum);
     
            // 更新 term 信息
            if (isLeader()) {
                local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
            } else {
                if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
                    //set leader term:
                    getLeader().term.set(source.term.get());
                    local.term.set(getLeader().term.get());
                } else {
                    local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
                }
            }
            raftStore.updateTerm(local.term.get());
     
            // 通知应用程序节点信息有变动
            notifier.addTask(datum.key, ApplyAction.CHANGE);
     
            Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
        }
    }

    6. Raft保证内容一致性

    Nacos通过Raft发布内容,内容只是存在了Leader节点上,通过Raft心跳机制来保证一致性。

    在注册信息的时候,addInstance() 方法将 instance 添加到了本地缓存中,但 raft 在从 leader 到 follower 同步数据的时候,follower 接收到包之后,只是通过 onPublish() 方法进行了持久化,并没有将信息更新到本地缓存,而是通过一个监听器来实现:

    在 onPublish 方法最后,有一行:notifier.addTask(datum.key, ApplyAction.CHANGE);,即:将本次的变动,添加到通知任务中,我们来看通知任务将如何被处理:

    @Component
    public class RaftCore {
     
        ......
     
        public class Notifier implements Runnable {
            private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
            private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
     
            // 添加变更任务到 tasks 队列
            public void addTask(String datumKey, ApplyAction action) {
     
                if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
                    return;
                }
                if (action == ApplyAction.CHANGE) {
                    services.put(datumKey, StringUtils.EMPTY);
                }
                tasks.add(Pair.with(datumKey, action));
            }
     
            public int getTaskSize() {
                return tasks.size();
            }
     
            // 处理任务线程
            @Override
            public void run() {
                Loggers.RAFT.info("raft notifier started");
     
                while (true) {
                    try {
                        Pair pair = tasks.take();
     
                        if (pair == null) {
                            continue;
                        }
     
                        String datumKey = (String) pair.getValue0();
                        ApplyAction action = (ApplyAction) pair.getValue1();
     
                        // 从服务列表中删除该 key
                        services.remove(datumKey);
     
                        int count = 0;
     
                        if (listeners.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
                            if (KeyBuilder.matchServiceMetaKey(datumKey) && !KeyBuilder.matchSwitchKey(datumKey)) {
                                for (RecordListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
                                    try {
                                        // 根据变更类型,调用不同的回调方法来进行缓存更新
                                        if (action == ApplyAction.CHANGE) {
                                            listener.onChange(datumKey, getDatum(datumKey).value);
                                        }
     
                                        if (action == ApplyAction.DELETE) {
                                            listener.onDelete(datumKey);
                                        }
                                    } catch (Throwable e) {
                                        Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datumKey, e);
                                    }
                                }
                            }
                        }
     
                        if (!listeners.containsKey(datumKey)) {
                            continue;
                        }
     
                        for (RecordListener listener : listeners.get(datumKey)) {
                            count++;
     
                            try {
                                if (action == ApplyAction.CHANGE) {
                                    listener.onChange(datumKey, getDatum(datumKey).value);
                                    continue;
                                }
     
                                if (action == ApplyAction.DELETE) {
                                    listener.onDelete(datumKey);
                                    continue;
                                }
                            } catch (Throwable e) {
                                Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datumKey, e);
                            }
                        }
     
                        if (Loggers.RAFT.isDebugEnabled()) {
                            Loggers.RAFT.debug("[NACOS-RAFT] datum change notified, key: {}, listener count: {}", datumKey, count);
                        }
                    } catch (Throwable e) {
                        Loggers.RAFT.error("[NACOS-RAFT] Error while handling notifying task", e);
                    }
                }
            }
        }
    }
  • 相关阅读:
    virtualBox中有线和无线两种情况下centos虚拟机和本地机互ping的方案
    微信小程序支付Java服务端开发源码,及那些你不知道的坑(一)
    SpringCloud微服务项目实战
    SpringCloud微服务项目实战
    SpringCloud微服务项目实战,服务注册与发现(附面试题)
    Spring Cloud微服务项目实战--Eureka服务搭建
    SpringBoot+SpringCloud面试总结都在这里
    SpringBoot 实现系统控制并发登录人数
    Java中大量if...else语句的消除替代方案
    SpringBoot基于数据库的定时任务实现
  • 原文地址:https://www.cnblogs.com/itplay/p/11037649.html
Copyright © 2011-2022 走看看