zoukankan      html  css  js  c++  java
  • Nacos源码分析(二):服务端和客户端实例注册

    一.启动nacos源码中的 nacos-excample项目

    1.添加JVM启动参数或者 直接在main方法中硬编码指定nacos-server的ip和端口

    Properties properties = new Properties();
    properties.setProperty("serverAddr", "127.0.0.1:8848");
    properties.setProperty("namespace", "public");
        @Override
        public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
                throws NacosException {
    
            Instance instance = new Instance();
            instance.setIp(ip);
            instance.setPort(port);
            instance.setWeight(1.0);
            instance.setClusterName(clusterName);
            //服务名 
            //组名 默认为 DEFAULT_GROUP
            //实例对象 
            registerInstance(serviceName, groupName, instance);
        }

    2.启动nacos源码中的 nacos-excample项目

    二.客户端实例注册源码分析

    0.以这行代码为入口 ,跟踪源码

    naming.registerInstance("nacos.test.3", "11.11.11.11", 8888);

    1.首先将客户端信息,如ip,端口,权重,集群名称等等,封装为实例(Instance)对象,调用registerInstance方法

    2.此时判断改实例是否是临时节点,如果是临时节点,则需要初始化心跳相关的信息,例如心跳间隔等等

    备注:

    Nacos 在 1.0.0版本 instance级别增加了一个ephemeral字段,该字段表示注册的实例是否是临时实例还是持久化实例。

    临时实例:则不会在 Nacos 服务端持久化存储,需要通过上报心跳的方式进行包活,如果一段时间内没有上报心跳,则会被 Nacos 服务端摘除。在被摘除后如果又开始上报心跳,则会重新将这个实例注册。

    持久化实例:则会持久化被 Nacos 服务端,此时即使注册实例的客户端进程不在,这个实例也不会从服务端删除,只会将健康状态设为不健康。

    同一个服务下可以同时有临时实例和持久化实例,这意味着当这服务的所有实例进程不在时,会有部分实例从服务上摘除,剩下的实例则会保留在服务下。

    使用实例的ephemeral来判断,ephemeral为true对应的是服务健康检查模式中的 client 模式,为false对应的是 server 模式。

        @Override
        public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
            String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
            //判断实例是否是临时节点,默认是true
            if (instance.isEphemeral()) {
                //初始化心跳相关的信息
                BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
                beatReactor.addBeatInfo(groupedServiceName, beatInfo);
            }
            serverProxy.registerService(groupedServiceName, groupName, instance);
        }

    心跳信息:

        /**
         * Build new beat information.
         *
         * @param groupedServiceName service name with group name, format: ${groupName}@@${serviceName}
         * @param instance instance
         * @return new beat information
         */
        public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
            BeatInfo beatInfo = new BeatInfo();
            //服务名
            beatInfo.setServiceName(groupedServiceName);
            //ip
            beatInfo.setIp(instance.getIp());
            //端口
            beatInfo.setPort(instance.getPort());
            //集群名称
            beatInfo.setCluster(instance.getClusterName());
            //权重
            beatInfo.setWeight(instance.getWeight());
            //元数据信息
            beatInfo.setMetadata(instance.getMetadata());
            //是否可调度
            beatInfo.setScheduled(false);
            //心跳间隔 默认5s
            beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
            return beatInfo;
        }

    添加心跳信息:将该实例的心跳任务添加到线程池中,这里看下心跳任务的源码 BeatTask

        class BeatTask implements Runnable {
    
            BeatInfo beatInfo;
    
            public BeatTask(BeatInfo beatInfo) {
                this.beatInfo = beatInfo;
            }
    
            @Override
            public void run() {
                if (beatInfo.isStopped()) {
                    return;
                }
                long nextTime = beatInfo.getPeriod();
                try {
                    //http 请求nacos 服务端 /instance/beat接口
                    //请求  参数1:心跳信息;参数2:是否是轻量级心跳(首次注册这里为false);
                    //备注: 这里false(nacos客户端心跳分为两种,一种是注册时心跳,目的是首次注册是上报,第2种是轻量级心跳,目的是保持链接)
                    JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                    long interval = result.get("clientBeatInterval").asInt();
                    boolean lightBeatEnabled = false;
                    if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                        lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                    }
                    //首次注册时心跳,服务端会返回参数lightBeatEnabled=true,标注下次心跳为轻量级心跳
                    BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                    if (interval > 0) {
                        nextTime = interval;
                    }
                    int code = NamingResponseCode.OK;
                    if (result.has(CommonParams.CODE)) {
                        code = result.get(CommonParams.CODE).asInt();
                    }
                    //实例未找到,则重新构造实例,发送心跳
                    if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                        Instance instance = new Instance();
                        instance.setPort(beatInfo.getPort());
                        instance.setIp(beatInfo.getIp());
                        instance.setWeight(beatInfo.getWeight());
                        instance.setMetadata(beatInfo.getMetadata());
                        instance.setClusterName(beatInfo.getCluster());
                        instance.setServiceName(beatInfo.getServiceName());
                        instance.setInstanceId(instance.getInstanceId());
                        instance.setEphemeral(true);
                        try {
                            serverProxy.registerService(beatInfo.getServiceName(),
                                    NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                        } catch (Exception ignore) {
                        }
                    }
                } catch (NacosException ex) {
                    NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                            JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
    
                }
                //构造下一次心跳任务
                executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
            }
        }

    好了们这里回到主线,也是客户端注册实例最后一步  serverProxy.registerService(groupedServiceName, groupName, instance);

        /**
         * register a instance to service with specified instance properties.
         *
         * @param serviceName name of service
         * @param groupName   group of service
         * @param instance    instance to register
         * @throws NacosException nacos exception
         */
        public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
            NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                    instance);
    
            final Map<String, String> params = new HashMap<String, String>(16);
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, serviceName);
            params.put(CommonParams.GROUP_NAME, groupName);
            params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
            params.put("ip", instance.getIp());
            params.put("port", String.valueOf(instance.getPort()));
            params.put("weight", String.valueOf(instance.getWeight()));
            params.put("enable", String.valueOf(instance.isEnabled()));
            params.put("healthy", String.valueOf(instance.isHealthy()));
            params.put("ephemeral", String.valueOf(instance.isEphemeral()));
            params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
            //请求服务端 v1/ns/instance 接口注册实例
            reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    
        }

    三.服务端实例注册源码分析:

    1.首先看服务端实例注册的入口也就是 v1/ns/instance 接口 

    源码位于nacos-naming项目中

    /**
     * Register new instance.
     *
     * @param request http request
     * @return 'ok' if success
     * @throws Exception any error during register
     */
    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        
        final Instance instance = parseInstance(request);
        
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

    2.服务端注册实例  serviceManager.registerInstance(namespaceId, serviceName, instance) 

        /**
         * Register an instance to a service in AP mode.
         *
         * <p>This method creates service or cluster silently if they don't exist.
         *
         * @param namespaceId id of namespace
         * @param serviceName service name
         * @param instance    instance to register
         * @throws Exception any error occurred in the process
         */
        public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
            //创建一个空服务,目的是如果实例第一次注册,则需要初始化心跳和其他相关的服务参数
            //备注:服务端是通过服务管理多个实例,服务与实例之间的关系为:服务:实例=1:N
            createEmptyService(namespaceId, serviceName, instance.isEphemeral());
            //从缓存Map中获取服务名
            Service service = getService(namespaceId, serviceName);
            //服务不存在,则抛出异常
            if (service == null) {
                throw new NacosException(NacosException.INVALID_PARAM,
                        "service not found, namespace: " + namespaceId + ", service: " + serviceName);
            }
            //添加实例
            addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
        }

    3.接下来看下创建空服务的方法  createEmptyService()

    跟踪源码到  createServiceIfAbsent方法

        /**
         * Create service if not exist.
         *
         * @param namespaceId namespace
         * @param serviceName service name
         * @param local       whether create service by local
         * @param cluster     cluster
         * @throws NacosException nacos exception
         */
        public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
                throws NacosException {
            //从缓存Map中获取服务
            Service service = getService(namespaceId, serviceName);
            //服务不存在,通常是在首次注册时,则需要初始化服务
            if (service == null) {
    
                Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
                service = new Service();
                //服务名
                service.setName(serviceName);
                //名称空间:namespaceId
                service.setNamespaceId(namespaceId);
                //组名: 例如DEFAULT_GROUP
                service.setGroupName(NamingUtils.getGroupName(serviceName));
                // now validate the service. if failed, exception will be thrown
                //最近一次修改服务的时间
                service.setLastModifiedMillis(System.currentTimeMillis());
                //计算服务的MD5值
                service.recalculateChecksum();
                if (cluster != null) {
                    cluster.setService(service);
                    service.getClusterMap().put(cluster.getName(), cluster);
                }
                service.validate();
                //添加服务,并且初始化:这里分为两个方法,分别是 添加服务 和 初始化
                putServiceAndInit(service);
                if (!local) {
                    addOrReplaceService(service);
                }
            }
        }

    4.接下来进入 putServiceAndInit 方法

        private void putServiceAndInit(Service service) throws NacosException {
            //添加服务
            putService(service);
            //服务的初始化
            service.init();
            consistencyService
                    .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
            consistencyService
                    .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
            Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
        }

    添加服务:putService(service);

        /**
         * Put service into manager.
         *
         * @param service service
         */
        public void putService(Service service) {
            //这里使用了serviceMap,这是个双层Map 去缓存服务信息
            //private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
            //Map(namespace, Map(group::serviceName, Service)).
            //外层Map key:名称空间,value:里Map
            //里层Map key:组名+服务名,value:对应的服务
            
            //首先会在缓存Map中招是否有 名称空间Id,如果没有,则初始化
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                synchronized (putServiceLock) {
                    if (!serviceMap.containsKey(service.getNamespaceId())) {
                        serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
                    }
                }
            }
            //如果有则直接获取名称空间 对应的Map,将服务添加进去
            serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
        }

    初始化服务  service.init();

        /**
         * Init service.
         */
        public void init() {
            //创建客户端健康检查任务
            HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
            for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
                entry.getValue().setService(this);
                entry.getValue().init();
            }
        }

    这里进入到 HealthCheckReactor.scheduleCheck(clientBeatCheckTask);

    这里可以看到健康检查任务也是使用了schedule线程池去初始化一个延迟任务任务,初始化延迟5s,延迟5s执行一次

        /**
         * Schedule client beat check task with a delay.
         *
         * @param task client beat check task
         */
        public static void scheduleCheck(ClientBeatCheckTask task) {
            futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
        }

    线程任务 ClientBeatCheckTask 源码,这里主要看两段代码

    -----判断实例是否健康

               //获取服务的所有实例
                List<Instance> instances = service.allIPs(true);
    
                // first set health status of instances:
                for (Instance instance : instances) {
                    //判断当前时间-实例最近一次心跳上报的时间 > 实例的心跳间隔,如果满足则表示,实例不健康
                    if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                        //判断实例是否被标记 默认是false
                        if (!instance.isMarked()) {
                            //判断实例是否健康 默认true
                            if (instance.isHealthy()) {
                                //设置实例为不健康
                                instance.setHealthy(false);
                                Loggers.EVT_LOG
                                        .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                                instance.getIp(), instance.getPort(), instance.getClusterName(),
                                                service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                                instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                                //获取推送服务,发布服务变更的消息,底层使用了updpush
                                getPushService().serviceChanged(service);
    ApplicationUtils.publishEvent(
    new InstanceHeartbeatTimeoutEvent(this, instance)); } } } }

    -----删除超时下线的实例

               // 删除已经下线的服务
                for (Instance instance : instances) {
    
                    if (instance.isMarked()) {
                        continue;
                    }
                    //当前时间- 实例最近一次心跳的时间 > 超时剔除的时间
                    if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                        // delete instance
                        Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                                JacksonUtils.toJson(instance));
                        deleteIp(instance);
                    }
                }

    5.接下来重回主线  putServiceAndInit

           //这里是数据一致性保证 这里暂时不做展开
    //Distro :CP 模型,最终一致性;
    //Raft: AP 模型, 强一致性;
    consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);

    6.回到主线 添加实例 registerInstance()--> addInstance()

    /**
         * Add instance to service.
         *
         * @param namespaceId namespace
         * @param serviceName service name
         * @param ephemeral   whether instance is ephemeral
         * @param ips         instances
         * @throws NacosException nacos exception
         */
        public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
                throws NacosException {
    
            String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    
            Service service = getService(namespaceId, serviceName);
    
            synchronized (service) {
                List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
    
                Instances instances = new Instances();
                instances.setInstanceList(instanceList);
                //放进一致性服务里。这里根据key来选择是临时性的还是永久性,如果是临时节点(实例),则使用distro协议进行数据一致性保障
                consistencyService.put(key, instances);
            }
        }

    7.这里看下 consistencyService.put(key, instances) ,基于临时节点的distro协议  ap模型保证

        //临时节点,使用distro数据一致性保障
        @Override
        public void put(String key, Record value) throws NacosException {
            onPut(key, value);
            // 临时一致性协议的同步数据。这里同步数据是异步任务执行的,添加到阻塞队列
            // 也就是说先返回客户端put成功再同步,弱一致性。 AP模型
            taskDispatcher.addTask(key);
        }

    首先是onPut()方法

    public void onPut(String key, Record value) {
            //判断是否是临时节点
            if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
                Datum<Instances> datum = new Datum<>();
                datum.value = (Instances) value;
                datum.key = key;
                datum.timestamp.incrementAndGet();
                dataStore.put(key, datum);
            }
            //判断当前key是否被监听
            //putServiceAndInit():
            //consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
            //consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
            if (!listeners.containsKey(key)) {
                return;
            }
    
            //有监听立即通知服务有改变,将任务放入阻塞队列中,然后通过while循环发布监听事件
          notifier.addTask(key, ApplyAction.CHANGE); }

    循环处理阻塞队列中的事件

    DistroConsistencyServiceImpl
      @PostConstruct
        public void init() {
            GlobalExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        load();
                    } catch (Exception e) {
                        Loggers.DISTRO.error("load data failed.", e);
                    }
                }
            });
    
            executor.submit(notifier);
            GlobalExecutor.submit(loadDataTask);
        }
    
           @Override
            public void run() {
                Loggers.DISTRO.info("distro notifier started");
    
                while (true) {
                    try {
    
                        Pair pair = tasks.take();
    
                        if (pair == null) {
                            continue;
                        }
    
                        String datumKey = (String) pair.getValue0();
                        ApplyAction action = (ApplyAction) pair.getValue1();
    
                        services.remove(datumKey);
    
                        int count = 0;
    
                        if (!listeners.containsKey(datumKey)) {
                            continue;
                        }
    
                        for (RecordListener listener : listeners.get(datumKey)) {
    
                            count++;
    
                            try {
                                if (action == ApplyAction.CHANGE) {
                                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                                    continue;
                                }
    
                                if (action == ApplyAction.DELETE) {
                                    listener.onDelete(datumKey);
                                    continue;
                                }
                            } catch (Throwable e) {
                                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                            }
                        }
    
                        if (Loggers.DISTRO.isDebugEnabled()) {
                            Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                datumKey, count, action.name());
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                    }
                }
            }

    最后返回OK

    return "ok";
  • 相关阅读:
    Laravel 通知
    LARAVEL 6 + VUE + SEMANTIC UI
    Laravel 从入门到精通教程【预备篇、基础篇】
    Laravel Vue.js 聊天室
    GIT代码管理: git remote add 【转载】
    Laravel Vuejs 实战:开发知乎 (45-47)用户设置
    Laravel Vuejs 实战:开发知乎 (42-44)用户头像
    如何在运行时更改JMeter的负载
    Jmeter Grafana Influxdb 环境搭建
    实时结果
  • 原文地址:https://www.cnblogs.com/july-sunny/p/15059585.html
Copyright © 2011-2022 走看看