Nacos心跳是健康检查的一部分,心跳是服务端确认客户端是否存活的关键
这里将源码拆为两部分:
1.客户端发送心跳
2.服务端接受客户端心跳
一.客户端发送心跳:
在第二部分客户端注册实例的源码分析时,提到判断如果实例是临时实例,就会初始化心跳信息和发送心跳的线程任务,如下
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); //判断实例是否是临时节点,默认是true if (instance.isEphemeral()) { //初始化心跳相关的信息 BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
/** * Add beat information. * * @param serviceName service name * @param beatInfo beat information */ public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
很显然,可以使用可调度的线程池去周期的执行(默认5s)心跳发送任务,这里可以看到BeatTask线程任务的源码
class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { //http 请求nacos 服务端 /instance/beat接口 //请求 参数1:心跳信息;参数2:是否是轻量级心跳(首次注册这里为false); //备注: 这里false(nacos客户端心跳分为两种,一种是注册时心跳,目的是首次注册是上报,第2种是轻量级心跳,目的是保持链接) JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asInt(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } //首次注册时心跳,服务端会返回参数lightBeatEnabled=true,标注下次心跳为轻量级心跳 BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } //实例未找到,则重新构造实例,发送心跳 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } //构造下一次心跳任务 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } }
显然这里请求服务端nacos/v1/instance/beat接口去发送心跳,
其中第一次心跳会携带实例的相关信息传递到服务端,服务端接受并且相应后,会变为轻量级心跳,不会上传实例的具体信息,见如下代码
/** * Send beat. * * @param beatInfo beat info * @param lightBeatEnabled light beat * @return beat result * @throws NacosException nacos exception */ public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); } Map<String, String> params = new HashMap<String, String>(8); Map<String, String> bodyMap = new HashMap<String, String>(2); if (!lightBeatEnabled) { //如果心跳不是轻量级心跳,则会携带心跳信息,例如,ip/端口/元数据信息,等等 bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
以上就是客户端发送心跳的时机,
二.服务端接收心跳
代码入口 如下
/** * Create a beat for instance. * * @param request http request * @return detail information of instance * @throws Exception any error during handle */ @CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put("clientBeatInterval", switchDomain.getClientBeatInterval()); //根据beat信息判断是否是注册时上报的心跳,还是轻量级心跳(存活心跳) //参考客户端心跳 /* if (!lightBeatEnabled) { //如果心跳不是轻量级心跳,则会携带心跳信息,例如,ip/端口/元数据信息,等等 bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); }*/ String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; //判断是否是注册心跳 if (StringUtils.isNotBlank(beat)) { //反序列化拿到心跳信息 clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); //通过 名称空间,服务名,集群名称,ip,端口在双层Map中查找当前心跳对应的客户端实例 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); //如果nacos服务端未找到实例 if (instance == null) { //且不是注册心跳 if (clientBeat == null) { //服务已经下线了 result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); //客户端注册心跳,也就本次心跳是第一次心跳,服务端会重新执行一次服务注册的逻辑 instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); //服务注册的逻辑 serviceManager.registerInstance(namespaceId, serviceName, instance); } //服务注册后再次获取服务 Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } //收到客户端心跳后,需要处理客户端心跳,例如更新最后一次心跳时间等参数 service.processClientBeat(clientBeat); result.put(CommonParams.CODE, NamingResponseCode.OK); //返回心跳间隔时间,客户端根据这个间隔时间,执行下一次心跳 result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval()); //返回轻量级心跳标识,表示下一次是轻量级心跳,不用上报全量的实例信息了 result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }
服务端这里对心跳的处理service.processClientBeat(clientBeat);
这里使用异步线程处理
/** * Process client beat. * * @param rsInfo metrics info of server */ public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor); }
这里看一下线程任务ClientBeatProcessor
** * Thread to update ephemeral instance triggered by client beat. * * @author nkorange */ public class ClientBeatProcessor implements Runnable { public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15); private RsInfo rsInfo; private Service service; @JsonIgnore public PushService getPushService() { return ApplicationUtils.getBean(PushService.class); } public RsInfo getRsInfo() { return rsInfo; } public void setRsInfo(RsInfo rsInfo) { this.rsInfo = rsInfo; } public Service getService() { return service; } public void setService(Service service) { this.service = service; } @Override public void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } //更新最后一次心跳的时间 instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); //服务如果有改变,发布服务变更事件 getPushService().serviceChanged(service); } } } } } }