zoukankan      html  css  js  c++  java
  • Nacos源码分析(三): 心跳设计

    Nacos心跳是健康检查的一部分,心跳是服务端确认客户端是否存活的关键

    这里将源码拆为两部分:

    1.客户端发送心跳

    2.服务端接受客户端心跳

    一.客户端发送心跳:

    在第二部分客户端注册实例的源码分析时,提到判断如果实例是临时实例,就会初始化心跳信息和发送心跳的线程任务,如下

        @Override
        public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
            String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
            //判断实例是否是临时节点,默认是true
            if (instance.isEphemeral()) {
                //初始化心跳相关的信息
                BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
                beatReactor.addBeatInfo(groupedServiceName, beatInfo);
            }
            serverProxy.registerService(groupedServiceName, groupName, instance);
        }
        /**
         * Add beat information.
         *
         * @param serviceName service name
         * @param beatInfo    beat information
         */
        public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
            NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
            String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
            BeatInfo existBeat = null;
            //fix #1733
            if ((existBeat = dom2Beat.remove(key)) != null) {
                existBeat.setStopped(true);
            }
            dom2Beat.put(key, beatInfo);
            executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
            

    很显然,可以使用可调度的线程池去周期的执行(默认5s)心跳发送任务,这里可以看到BeatTask线程任务的源码

        class BeatTask implements Runnable {
    
            BeatInfo beatInfo;
    
            public BeatTask(BeatInfo beatInfo) {
                this.beatInfo = beatInfo;
            }
    
            @Override
            public void run() {
                if (beatInfo.isStopped()) {
                    return;
                }
                long nextTime = beatInfo.getPeriod();
                try {
                    //http 请求nacos 服务端 /instance/beat接口
                    //请求  参数1:心跳信息;参数2:是否是轻量级心跳(首次注册这里为false);
                    //备注: 这里false(nacos客户端心跳分为两种,一种是注册时心跳,目的是首次注册是上报,第2种是轻量级心跳,目的是保持链接)
                    JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                    long interval = result.get("clientBeatInterval").asInt();
                    boolean lightBeatEnabled = false;
                    if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                        lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                    }
                    //首次注册时心跳,服务端会返回参数lightBeatEnabled=true,标注下次心跳为轻量级心跳
                    BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                    if (interval > 0) {
                        nextTime = interval;
                    }
                    int code = NamingResponseCode.OK;
                    if (result.has(CommonParams.CODE)) {
                        code = result.get(CommonParams.CODE).asInt();
                    }
                    //实例未找到,则重新构造实例,发送心跳
                    if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                        Instance instance = new Instance();
                        instance.setPort(beatInfo.getPort());
                        instance.setIp(beatInfo.getIp());
                        instance.setWeight(beatInfo.getWeight());
                        instance.setMetadata(beatInfo.getMetadata());
                        instance.setClusterName(beatInfo.getCluster());
                        instance.setServiceName(beatInfo.getServiceName());
                        instance.setInstanceId(instance.getInstanceId());
                        instance.setEphemeral(true);
                        try {
                            serverProxy.registerService(beatInfo.getServiceName(),
                                    NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                        } catch (Exception ignore) {
                        }
                    }
                } catch (NacosException ex) {
                    NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                            JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
    
                }
                //构造下一次心跳任务
                executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
            }
        }

    显然这里请求服务端nacos/v1/instance/beat接口去发送心跳,

    其中第一次心跳会携带实例的相关信息传递到服务端,服务端接受并且相应后,会变为轻量级心跳,不会上传实例的具体信息,见如下代码

       /**
         * Send beat.
         *
         * @param beatInfo         beat info
         * @param lightBeatEnabled light beat
         * @return beat result
         * @throws NacosException nacos exception
         */
        public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
    
            if (NAMING_LOGGER.isDebugEnabled()) {
                NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
            }
            Map<String, String> params = new HashMap<String, String>(8);
            Map<String, String> bodyMap = new HashMap<String, String>(2);
            if (!lightBeatEnabled) {
                //如果心跳不是轻量级心跳,则会携带心跳信息,例如,ip/端口/元数据信息,等等
                bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
            }
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
            params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
            params.put("ip", beatInfo.getIp());
            params.put("port", String.valueOf(beatInfo.getPort()));
            String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
            return JacksonUtils.toObj(result);
        }

    以上就是客户端发送心跳的时机,

    二.服务端接收心跳

    代码入口 如下

        /**
         * Create a beat for instance.
         *
         * @param request http request
         * @return detail information of instance
         * @throws Exception any error during handle
         */
        @CanDistro
        @PutMapping("/beat")
        @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
        public ObjectNode beat(HttpServletRequest request) throws Exception {
    
            ObjectNode result = JacksonUtils.createEmptyJsonNode();
            result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
    
            //根据beat信息判断是否是注册时上报的心跳,还是轻量级心跳(存活心跳)
            //参考客户端心跳
            /*  if (!lightBeatEnabled) {
                //如果心跳不是轻量级心跳,则会携带心跳信息,例如,ip/端口/元数据信息,等等
                bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
            }*/
            String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
            RsInfo clientBeat = null;
            //判断是否是注册心跳
            if (StringUtils.isNotBlank(beat)) {
                //反序列化拿到心跳信息
                clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
            }
            String clusterName = WebUtils
                    .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
            int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
            if (clientBeat != null) {
                if (StringUtils.isNotBlank(clientBeat.getCluster())) {
                    clusterName = clientBeat.getCluster();
                } else {
                    // fix #2533
                    clientBeat.setCluster(clusterName);
                }
                ip = clientBeat.getIp();
                port = clientBeat.getPort();
            }
            String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
            Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
            //通过 名称空间,服务名,集群名称,ip,端口在双层Map中查找当前心跳对应的客户端实例
            Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    
            //如果nacos服务端未找到实例
            if (instance == null) {
                //且不是注册心跳
                if (clientBeat == null) {
                    //服务已经下线了
                    result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
                    return result;
                }
    
                Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                        + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
                //客户端注册心跳,也就本次心跳是第一次心跳,服务端会重新执行一次服务注册的逻辑
                instance = new Instance();
                instance.setPort(clientBeat.getPort());
                instance.setIp(clientBeat.getIp());
                instance.setWeight(clientBeat.getWeight());
                instance.setMetadata(clientBeat.getMetadata());
                instance.setClusterName(clusterName);
                instance.setServiceName(serviceName);
                instance.setInstanceId(instance.getInstanceId());
                instance.setEphemeral(clientBeat.isEphemeral());
                //服务注册的逻辑
                serviceManager.registerInstance(namespaceId, serviceName, instance);
            }
            //服务注册后再次获取服务
            Service service = serviceManager.getService(namespaceId, serviceName);
    
            if (service == null) {
                throw new NacosException(NacosException.SERVER_ERROR,
                        "service not found: " + serviceName + "@" + namespaceId);
            }
            if (clientBeat == null) {
                clientBeat = new RsInfo();
                clientBeat.setIp(ip);
                clientBeat.setPort(port);
                clientBeat.setCluster(clusterName);
            }
            //收到客户端心跳后,需要处理客户端心跳,例如更新最后一次心跳时间等参数
            service.processClientBeat(clientBeat);
    
            result.put(CommonParams.CODE, NamingResponseCode.OK);
            //返回心跳间隔时间,客户端根据这个间隔时间,执行下一次心跳
            result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
            //返回轻量级心跳标识,表示下一次是轻量级心跳,不用上报全量的实例信息了
            result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
            return result;
        }
    服务端这里对心跳的处理service.processClientBeat(clientBeat);
    这里使用异步线程处理
        /**
         * Process client beat.
         *
         * @param rsInfo metrics info of server
         */
        public void processClientBeat(final RsInfo rsInfo) {
            ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
            clientBeatProcessor.setService(this);
            clientBeatProcessor.setRsInfo(rsInfo);
            HealthCheckReactor.scheduleNow(clientBeatProcessor);
        }

     这里看一下线程任务ClientBeatProcessor

    **
     * Thread to update ephemeral instance triggered by client beat.
     *
     * @author nkorange
     */
    public class ClientBeatProcessor implements Runnable {
    
        public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
    
        private RsInfo rsInfo;
    
        private Service service;
    
        @JsonIgnore
        public PushService getPushService() {
            return ApplicationUtils.getBean(PushService.class);
        }
    
        public RsInfo getRsInfo() {
            return rsInfo;
        }
    
        public void setRsInfo(RsInfo rsInfo) {
            this.rsInfo = rsInfo;
        }
    
        public Service getService() {
            return service;
        }
    
        public void setService(Service service) {
            this.service = service;
        }
    
        @Override
        public void run() {
            Service service = this.service;
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
            }
    
            String ip = rsInfo.getIp();
            String clusterName = rsInfo.getCluster();
            int port = rsInfo.getPort();
            Cluster cluster = service.getClusterMap().get(clusterName);
            List<Instance> instances = cluster.allIPs(true);
    
            for (Instance instance : instances) {
                if (instance.getIp().equals(ip) && instance.getPort() == port) {
                    if (Loggers.EVT_LOG.isDebugEnabled()) {
                        Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                    }
                    //更新最后一次心跳的时间
                    instance.setLastBeat(System.currentTimeMillis());
                    if (!instance.isMarked()) {
                        if (!instance.isHealthy()) {
                            instance.setHealthy(true);
                            Loggers.EVT_LOG
                                    .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                            cluster.getService().getName(), ip, port, cluster.getName(),
                                            UtilsAndCommons.LOCALHOST_SITE);
                            //服务如果有改变,发布服务变更事件
                            getPushService().serviceChanged(service);
                        }
                    }
                }
            }
        }
    }

























  • 相关阅读:
    475.Heaters java
    爬取豆瓣新热门电影数据
    ORALCE逻辑存储结构
    UnicodeDecodeError: 'utf-8' codec can't decode byte 问题
    ORA-32004: obsolete or deprecated parameter(s) specified for RDBMS instance
    oracle和mysql区别
    ORACLE ITL事务槽
    oracle的锁种类知识普及
    仅主机、NAT、桥接模式
    oracle11g和12c区别
  • 原文地址:https://www.cnblogs.com/july-sunny/p/15068607.html
Copyright © 2011-2022 走看看