一.启动nacos源码中的 nacos-excample项目
1.添加JVM启动参数或者 直接在main方法中硬编码指定nacos-server的ip和端口
Properties properties = new Properties();
properties.setProperty("serverAddr", "127.0.0.1:8848");
properties.setProperty("namespace", "public");
@Override public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException { Instance instance = new Instance(); instance.setIp(ip); instance.setPort(port); instance.setWeight(1.0); instance.setClusterName(clusterName); //服务名 //组名 默认为 DEFAULT_GROUP //实例对象 registerInstance(serviceName, groupName, instance); }
2.启动nacos源码中的 nacos-excample项目
二.客户端实例注册源码分析
0.以这行代码为入口 ,跟踪源码
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888);
1.首先将客户端信息,如ip,端口,权重,集群名称等等,封装为实例(Instance)对象,调用registerInstance方法
2.此时判断改实例是否是临时节点,如果是临时节点,则需要初始化心跳相关的信息,例如心跳间隔等等
备注:
Nacos 在 1.0.0版本 instance级别增加了一个ephemeral字段,该字段表示注册的实例是否是临时实例还是持久化实例。
临时实例:则不会在 Nacos 服务端持久化存储,需要通过上报心跳的方式进行包活,如果一段时间内没有上报心跳,则会被 Nacos 服务端摘除。在被摘除后如果又开始上报心跳,则会重新将这个实例注册。
持久化实例:则会持久化被 Nacos 服务端,此时即使注册实例的客户端进程不在,这个实例也不会从服务端删除,只会将健康状态设为不健康。
同一个服务下可以同时有临时实例和持久化实例,这意味着当这服务的所有实例进程不在时,会有部分实例从服务上摘除,剩下的实例则会保留在服务下。
使用实例的ephemeral来判断,ephemeral为true对应的是服务健康检查模式中的 client 模式,为false对应的是 server 模式。
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); //判断实例是否是临时节点,默认是true if (instance.isEphemeral()) { //初始化心跳相关的信息 BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
心跳信息:
/** * Build new beat information. * * @param groupedServiceName service name with group name, format: ${groupName}@@${serviceName} * @param instance instance * @return new beat information */ public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) { BeatInfo beatInfo = new BeatInfo(); //服务名 beatInfo.setServiceName(groupedServiceName); //ip beatInfo.setIp(instance.getIp()); //端口 beatInfo.setPort(instance.getPort()); //集群名称 beatInfo.setCluster(instance.getClusterName()); //权重 beatInfo.setWeight(instance.getWeight()); //元数据信息 beatInfo.setMetadata(instance.getMetadata()); //是否可调度 beatInfo.setScheduled(false); //心跳间隔 默认5s beatInfo.setPeriod(instance.getInstanceHeartBeatInterval()); return beatInfo; }
添加心跳信息:将该实例的心跳任务添加到线程池中,这里看下心跳任务的源码 BeatTask
class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { //http 请求nacos 服务端 /instance/beat接口 //请求 参数1:心跳信息;参数2:是否是轻量级心跳(首次注册这里为false); //备注: 这里false(nacos客户端心跳分为两种,一种是注册时心跳,目的是首次注册是上报,第2种是轻量级心跳,目的是保持链接) JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asInt(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } //首次注册时心跳,服务端会返回参数lightBeatEnabled=true,标注下次心跳为轻量级心跳 BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } //实例未找到,则重新构造实例,发送心跳 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } //构造下一次心跳任务 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } }
好了们这里回到主线,也是客户端注册实例最后一步 serverProxy.registerService(groupedServiceName, groupName, instance);
/** * register a instance to service with specified instance properties. * * @param serviceName name of service * @param groupName group of service * @param instance instance to register * @throws NacosException nacos exception */ public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); //请求服务端 v1/ns/instance 接口注册实例 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
三.服务端实例注册源码分析:
1.首先看服务端实例注册的入口也就是 v1/ns/instance 接口
源码位于nacos-naming项目中
/** * Register new instance. * * @param request http request * @return 'ok' if success * @throws Exception any error during register */ @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
2.服务端注册实例 serviceManager.registerInstance(namespaceId, serviceName, instance)
/** * Register an instance to a service in AP mode. * * <p>This method creates service or cluster silently if they don't exist. * * @param namespaceId id of namespace * @param serviceName service name * @param instance instance to register * @throws Exception any error occurred in the process */ public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //创建一个空服务,目的是如果实例第一次注册,则需要初始化心跳和其他相关的服务参数 //备注:服务端是通过服务管理多个实例,服务与实例之间的关系为:服务:实例=1:N createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //从缓存Map中获取服务名 Service service = getService(namespaceId, serviceName); //服务不存在,则抛出异常 if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //添加实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
3.接下来看下创建空服务的方法 createEmptyService()
跟踪源码到 createServiceIfAbsent方法
/** * Create service if not exist. * * @param namespaceId namespace * @param serviceName service name * @param local whether create service by local * @param cluster cluster * @throws NacosException nacos exception */ public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { //从缓存Map中获取服务 Service service = getService(namespaceId, serviceName); //服务不存在,通常是在首次注册时,则需要初始化服务 if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); //服务名 service.setName(serviceName); //名称空间:namespaceId service.setNamespaceId(namespaceId); //组名: 例如DEFAULT_GROUP service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown //最近一次修改服务的时间 service.setLastModifiedMillis(System.currentTimeMillis()); //计算服务的MD5值 service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); //添加服务,并且初始化:这里分为两个方法,分别是 添加服务 和 初始化 putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }
4.接下来进入 putServiceAndInit 方法
private void putServiceAndInit(Service service) throws NacosException { //添加服务 putService(service); //服务的初始化 service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
添加服务:putService(service);
/** * Put service into manager. * * @param service service */ public void putService(Service service) { //这里使用了serviceMap,这是个双层Map 去缓存服务信息 //private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); //Map(namespace, Map(group::serviceName, Service)). //外层Map key:名称空间,value:里Map //里层Map key:组名+服务名,value:对应的服务 //首先会在缓存Map中招是否有 名称空间Id,如果没有,则初始化 if (!serviceMap.containsKey(service.getNamespaceId())) { synchronized (putServiceLock) { if (!serviceMap.containsKey(service.getNamespaceId())) { serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16)); } } } //如果有则直接获取名称空间 对应的Map,将服务添加进去 serviceMap.get(service.getNamespaceId()).put(service.getName(), service); }
初始化服务 service.init();
/** * Init service. */ public void init() { //创建客户端健康检查任务 HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }
这里进入到 HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
这里可以看到健康检查任务也是使用了schedule线程池去初始化一个延迟任务任务,初始化延迟5s,延迟5s执行一次
/** * Schedule client beat check task with a delay. * * @param task client beat check task */ public static void scheduleCheck(ClientBeatCheckTask task) { futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS)); }
线程任务 ClientBeatCheckTask 源码,这里主要看两段代码
-----判断实例是否健康
//获取服务的所有实例 List<Instance> instances = service.allIPs(true); // first set health status of instances: for (Instance instance : instances) { //判断当前时间-实例最近一次心跳上报的时间 > 实例的心跳间隔,如果满足则表示,实例不健康 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { //判断实例是否被标记 默认是false if (!instance.isMarked()) { //判断实例是否健康 默认true if (instance.isHealthy()) { //设置实例为不健康 instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); //获取推送服务,发布服务变更的消息,底层使用了updpush getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } }
-----删除超时下线的实例
// 删除已经下线的服务 for (Instance instance : instances) { if (instance.isMarked()) { continue; } //当前时间- 实例最近一次心跳的时间 > 超时剔除的时间 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } }
5.接下来重回主线 putServiceAndInit
//这里是数据一致性保证 这里暂时不做展开
//Distro :CP 模型,最终一致性;
//Raft: AP 模型, 强一致性;
consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
6.回到主线 添加实例 registerInstance()--> addInstance()
/** * Add instance to service. * * @param namespaceId namespace * @param serviceName service name * @param ephemeral whether instance is ephemeral * @param ips instances * @throws NacosException nacos exception */ public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); //放进一致性服务里。这里根据key来选择是临时性的还是永久性,如果是临时节点(实例),则使用distro协议进行数据一致性保障
consistencyService.put(key, instances);
}
}
7.这里看下 consistencyService.put(key, instances) ,基于临时节点的distro协议 ap模型保证
//临时节点,使用distro数据一致性保障 @Override public void put(String key, Record value) throws NacosException { onPut(key, value); // 临时一致性协议的同步数据。这里同步数据是异步任务执行的,添加到阻塞队列 // 也就是说先返回客户端put成功再同步,弱一致性。 AP模型 taskDispatcher.addTask(key); }
首先是onPut()方法
public void onPut(String key, Record value) { //判断是否是临时节点 if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } //判断当前key是否被监听 //putServiceAndInit(): //consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); //consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); if (!listeners.containsKey(key)) { return; } //有监听立即通知服务有改变,将任务放入阻塞队列中,然后通过while循环发布监听事件
notifier.addTask(key, ApplyAction.CHANGE); }
循环处理阻塞队列中的事件
DistroConsistencyServiceImpl
@PostConstruct public void init() { GlobalExecutor.submit(new Runnable() { @Override public void run() { try { load(); } catch (Exception e) { Loggers.DISTRO.error("load data failed.", e); } } }); executor.submit(notifier); GlobalExecutor.submit(loadDataTask); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); while (true) { try { Pair pair = tasks.take(); if (pair == null) { continue; } String datumKey = (String) pair.getValue0(); ApplyAction action = (ApplyAction) pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { continue; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == ApplyAction.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == ApplyAction.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } }
最后返回OK
return "ok";