zoukankan      html  css  js  c++  java
  • 分布式中间件nacos入门解析

    一、Nacos简介

    1、Nacos是什么?

    Nacos是阿里巴巴开源的一个为微服务提供服务发现、服务配置和服务管理的微服务基础设施,简单说就是Nacos为微服务架构提供了分布式配置和服务注册中心的工作。

    2、Nacos有什么功能?

    Nacos主要有两大功能:注册中心和配置中心

    2.1、注册中心

    a.服务发布:服务提供者发布服务到nacos,nacos存储服务和提供者关系;

    b.服务订阅:服务消费者从nacos订阅服务,拉去服务提供者信息列表;

    c.变更推送:当服务提供者信息变更时,实时通知服务消费者;

    d.路由策略:根据不同路由规则,推送不同服务提供者信息给消费者;

    e.健康检测:和服务提供者和服务消费者保持心跳,检测服务的健康状态;

    2.2、配置中心

    a.管理配置:配置的增删改查管理;

    b.监听配置:客户端实时监听配置的更新情况;

    c.灰度更新:允许针对部分客户端进行配置更新;

    d.配置快照:客户端需要缓存配置快照,当nacos服务器不可用时可以使用本地配置,提高整体容灾能力。

    3、Nacos有哪些概念?

    3.1、命名空间(namespace)

    命名空间是用于配置和服务的空间隔离,不同命名空间下的数据相互独立,不同命名空间下可以存在相同配置和相同服务,通常命名空间可用于不同环境。如开发环境、测试环境和生产环境可以通过命名空间来进行区分隔离。

    nacos默认有一个保留的命名空间为public,每一个命名空间都有一个唯一的ID,如果没有手动配置则会自动生产一个。服务管理和配置管理都是在命名空间区域内进行管理,每一个服务和配置都会绑定一个命名空间。

    3.2、配置分组(Group)

    同一个命名空间下可以有多个应用的配置,每个应用都可能有相同的配置,所以需要有一个分组来将属于同一个应用的配置进行区分。配置分组不需要单独管理,在管理配置集时添加配置分组即可。

    3.3、配置集(Data)

    配置集是一组配置的集合,通常一个配置文件就是一个配置集,每一个配置集都有一个配置集ID叫做Data ID,如和缓存相关配置都可以放在配置集cache.properties中,数据库配置放在db.properties中。

    配置集ID可以重复,但是同一个命名空间下同一个配置分组下的配置集ID不可重复,也就是说命名空间+配置分组+配置集ID可以唯一定位一个配置文件。

    3.4、服务

    通过预定义接口网络访问的提供给客户端的软件功能。每个服务都有一个服务名是服务提供的标识,通过该标识可以唯一确定其指代的服务。

    3.5、服务注册

    服务提供者将自己提供的服务注册到nacos,nacos存储服务和服务提供者关系。

    3.6、服务订阅

    服务消费者从nacos上获取对应服务的服务提供者信息列表

    3.7、元数据

    Nacos数据(如配置和服务)描述信息,如服务版本、权重、容灾策略、负载均衡策略、鉴权配置、各种自定义标签 (label),从作用范围来看,分为服务级别的元信息、集群的元信息及实例的元信息。

    3.8、权重

    实例级别的配置。权重为浮点数。权重越大,分配给该实例的流量越大。

    3.9、健康检查

    以指定方式检查服务下挂载的实例 (Instance) 的健康度,从而确认该实例 (Instance) 是否能提供服务。根据检查结果,实例 (Instance) 会被判断为健康或不健康。对服务发起解析请求时,不健康的实例 (Instance) 不会返回给客户端。

    3.10、健康保护阈值

    为了防止因过多实例 (Instance) 不健康导致流量全部流向健康实例 (Instance) ,继而造成流量压力把健康实例 (Instance) 压垮并形成雪崩效应,应将健康保护阈值定义为一个 0 到 1 之间的浮点数。当域名健康实例数 (Instance) 占总服务实例数 (Instance) 的比例小于

    该值时,无论实例 (Instance) 是否健康,都会将这个实例 (Instance) 返回给客户端。这样做虽然损失了一部分流量,但是保证了集群中剩余健康实例 (Instance) 能正常工作。

    二、Nacos使用

    2.1、Nacos的Open API

    Nacos提供了大量的HTTP API,其中包括配置管理、服务管理和命名空间管理等,核心API如下

    配置管理 获取配置 GET /nacos/v1/cs/configs
      监听配置 POST /nacos/v1/cs/configs/listener
      发布配置 POST /nacos/v1/cs/configs
      删除配置 DELETE /nacos/v1/cs/configs
      查询历史版本配置 GET /nacos/v1/cs/history?search=accurate
      查询上一个版本配置 GET /nacos/v1/cs/history/previous
    服务发现 注册实例 POST /nacos/v1/ns/instance
      注销实例 DELETE /nacos/v1/ns/instance
      修改实例 PUT /nacos/v1/ns/instance
      查询实例列表   GET /nacos/v1/ns/instance/list
      查询实例详情 GET /nacos/v1/ns/instance
      发送实例心跳 PUT /nacos/v1/ns/instance/beat
      创建服务 POST /nacos/v1/ns/service
      删除服务 DELETE /nacos/v1/ns/service
      修改服务 PUT /nacos/v1/ns/service
      查询服务详情 GET /nacos/v1/ns/service
      查询服务列表 GET /nacos/v1/ns/service/list
      查询系统数据指标 GET /nacos/v1/ns/operator/metrics
      查询集群服务器列表 GET /nacos/v1/ns/operator/servers
      查询集群当前Leader GET /nacos/v1/ns/raft/leader
      更新实例健康状态 PUT /nacos/v1/ns/health/instance
      批量更新实例元数据 PUT /nacos/v1/ns/instance/metadata/batch
    命名空间 查询命名空间列表 GET /nacos/v1/console/namespaces
      创建命名空间 POST /nacos/v1/console/namespaces
      修改命名空间 PUT /nacos/v1/console/namespaces
      删除命名空间 DELETE /nacos/v1/console/namespaces


    2.2、JAVA集成Nacos的SDK

    Maven依赖

    <dependency>
        <groupId>com.alibaba.nacos</groupId>
        <artifactId>nacos-client</artifactId>
        <version>${version}</version>
    </dependency>

    2.2.1、配置管理

    和配置相关功能都定义在ConfigService接口中,根据NacosFactory可以创建ConfigService对象,调用ConfigService相关方法就可对配置文件进行增删改查或监听配置更新,ConfigService相关方法定义如下:

    public interface ConfigService {
    
            /**
             * 获取配置
             */
            String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
    
            /**
             * 获取配置并添加监听器监听配置变更
             */
            String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener)
                    throws NacosException;
    
            /**
             * 添加监听器监听配置变更
             */
            void addListener(String dataId, String group, Listener listener) throws NacosException;
    
            /**
             * 发布配置
             */
            boolean publishConfig(String dataId, String group, String content) throws NacosException;
    
            /**
             * 发布指定类型的配置,如yml、xml、properties、json等
             */
            boolean publishConfig(String dataId, String group, String content, String type) throws NacosException;
    
            /**
             * 删除配置
             */
            boolean removeConfig(String dataId, String group) throws NacosException;
    
            /**
             * 删除监听器
             */
            void removeListener(String dataId, String group, Listener listener);
    
            /**
             * 获取服务器状态
             */
            String getServerStatus();
    
            /**
             * 关闭服务
             */
            void shutDown() throws NacosException;
        }

    ConfigService测试案例代码如下:

    public static void main(String[] args) throws NacosException {
            /** 配置管理服务*/
            String nacosServer = "localhost:8848";
            ConfigService configService = NacosFactory.createConfigService(nacosServer);
    
            String dataId = "db.config";
            String group = "lucky";
            /** 1.发布配置*/
            String configContent = "";
            configService.publishConfig(dataId, group, configContent);
            /** 2.获取配置*/
            String config = configService.getConfig(dataId, group, 5000);
            /** 3.添加配置更新监听器*/
            configService.addListener(dataId, group, new Listener() {
                @Override
                public Executor getExecutor() {
                    return null;
                }
    
                @Override
                public void receiveConfigInfo(String configInfo) {
                    System.out.println("监听配置更新:" + configInfo);
                    //TODO 处理配置更新
                }
            });
            while (true){
    
            }
        }

    2.2.2、服务管理

    服务管理相关功能都由NamingService接口定义,根据NacosFactory可以获取NamingService实例,NamingService包含服务注册、订阅等相关方法,定义如下:

    public interface NamingService {
    
            /**
             *  注册服务实例
             */
            void registerInstance(String serviceName, String ip, int port) throws NacosException;
    
            void registerInstance(String serviceName, String groupName, String ip, int port) throws NacosException;
    
            void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;
    
            void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException;
    
            void registerInstance(String serviceName, Instance instance) throws NacosException;
    
            void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException;
    
            /**
             * 注销服务实例
             */
            void deregisterInstance(String serviceName, String ip, int port) throws NacosException;
    
            void deregisterInstance(String serviceName, String groupName, String ip, int port) throws NacosException;
    
            void deregisterInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;
    
            void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException;
    
            void deregisterInstance(String serviceName, Instance instance) throws NacosException;
    
            void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException;
    
            /**
             * 根据条件获取服务实例列表
             */
            List<Instance> getAllInstances(String serviceName) throws NacosException;
    
            List<Instance> getAllInstances(String serviceName, String groupName) throws NacosException;
    
            List<Instance> getAllInstances(String serviceName, boolean subscribe) throws NacosException;
    
            List<Instance> getAllInstances(String serviceName, String groupName, boolean subscribe) throws NacosException;
    
            List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException;
    
            List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters) throws NacosException;
    
            List<Instance> getAllInstances(String serviceName, List<String> clusters, boolean subscribe) throws NacosException;
    
            List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException;
    
            /**
             * 根据条件选择服务实例列表
             */
            List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException;
    
            List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException;
    
            List<Instance> selectInstances(String serviceName, boolean healthy, boolean subscribe) throws NacosException;
    
            List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException;
    
            List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy) throws NacosException;
    
            List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy) throws NacosException;
    
            List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException;
    
            List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException;
    
            /**
             * 根据条件以及负载均衡策略选择一个健康的服务实例
             */
            Instance selectOneHealthyInstance(String serviceName) throws NacosException;
    
            Instance selectOneHealthyInstance(String serviceName, String groupName) throws NacosException;
    
            Instance selectOneHealthyInstance(String serviceName, boolean subscribe) throws NacosException;
    
            Instance selectOneHealthyInstance(String serviceName, String groupName, boolean subscribe) throws NacosException;
    
            Instance selectOneHealthyInstance(String serviceName, List<String> clusters) throws NacosException;
    
            Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters) throws NacosException;
    
            Instance selectOneHealthyInstance(String serviceName, List<String> clusters, boolean subscribe) throws NacosException;
    
            Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException;
    
            /**
             * 订阅服务,并开启Listener监听服务变更事件
             */
            void subscribe(String serviceName, EventListener listener) throws NacosException;
    
            void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException;
    
            void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException;
    
            void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
                    throws NacosException;
    
            /**
             * 取消订阅服务,并关闭Listener监听服务变更事件
             */
            void unsubscribe(String serviceName, EventListener listener) throws NacosException;
    
            void unsubscribe(String serviceName, String groupName, EventListener listener) throws NacosException;
    
            void unsubscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException;
    
            void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
                    throws NacosException;
    
            /**
             * 根据条件获取所有服务名称列表
             */
            ListView<String> getServicesOfServer(int pageNo, int pageSize) throws NacosException;
    
            ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName) throws NacosException;
    
            ListView<String> getServicesOfServer(int pageNo, int pageSize, AbstractSelector selector) throws NacosException;
    
            ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException;
    
            /**
             * 获取当前客户端订阅的服务列表
             */
            List<ServiceInfo> getSubscribeServices() throws NacosException;
    
            /**
             * 获取服务器状态
             */
            String getServerStatus();
    
            /**
             * 关闭服务器
             */
            void shutDown() throws NacosException;
        }

     NamingService测试案例代码如下:

    public static void main(String[] args) throws NacosException {
            String serverAddr = "42.192.94.208:8858";
            /** 1.创建NamingService实例 */
            NamingService namingService = NacosFactory.createNamingService(serverAddr);
            /** 2.注册实例*/
            namingService.registerInstance("testService", "localhost", 8080);
            /** 3.注销实例*/
            namingService.deregisterInstance("testService", "localhost", 8080);
            /** 4.获取所有健康实例*/
            List<Instance> instances = namingService.selectInstances("testService", true);
            /** 5.监听服务变化*/
            namingService.subscribe("testService", new EventListener() {
                @Override
                public void onEvent(Event event) {
                    System.out.println("处理服务变更事件");
                    if(event instanceof NamingEvent){
                        //TODO
                    }
                }
            });
            while (true){
    
            }
        }

    2.3、dubbo集成Nacos注册中心

    dubbo采用Nacos作为注册中心,只需要在配置注册中心时将地址改成nacos地址即可,如下:

    XML配置

    <!-- nacos地址 -->
    <dubbo:registry address="nacos://127.0.0.1:8848" />

     外部配置

    ## dubbo注册中心地址
    dubbo.registry.address = zookeeper://10.20.153.10:2181

     2.4、SpringBoot集成Nacos配置中心

    添加nacos依赖

    <dependency>
             <groupId>com.alibaba.boot</groupId>
             <artifactId>nacos-config-spring-boot-starter</artifactId>
             <version>0.2.1</version>
    </dependency>

    版本号0.2.x.RELEASE对应的是 Spring Boot 2.x 版本,版本0.1.x.RELEASE对应的是 Spring Boot 1.x 版本

    在application.properties配置文件中添加nacos地址配置

    nacos.config.server-addr=127.0.0.1:8848

     在SpringBoot启动类添加@NacosProperySource注解添加Nacos配置来源,autoRefreshed表示是否自动更新

    @NacosPropertySource(dataId = "db.config", autoRefreshed = true)

     通过nacos的@NacosValue注解给变量赋值配置的值,autoRefreshed表示是否自动更新,如:

        @NacosValue(value = "${db.username:tempUser}", autoRefreshed = true)
        private String dbUser;
    
        @NacosValue(value = "${db.password:tempPassword}")
        private String dbPassword;

    三、Nacos实现原理

    3.1、配置中心实现原理

    Nacos提供了大量的配置管理相关API供客户端调用,客户端可以很方便的调用API来进行配置管理。所以Nacos Client启动的时候只需要调用Nacos server的接口就可以获取到所有的配置。

    所以客户端获取配置的重点是如何进行热更新,也就是当服务端配置更新后,客户端是如何根据监听器进行实时更新的,监听器又是如何实现的呢?首先就需要从ConfigService的addListener方法入手。

    ConfigService接口的实现类是NacosConfigService,addListener方法源码如下:

     1 private final ClientWorker worker;
     2 
     3     /** NacosConfigService类 添加配置更新监听器方法
     4      * @param dataId : 配置集
     5      * @param group : 配置分组
     6      * @param listener : 配置更新监听器
     7      *  */
     8     public void addListener(String dataId, String group, Listener listener) throws NacosException {
     9         //调用ClientWorker对象方法
    10         worker.addTenantListeners(dataId, group, Arrays.asList(listener));
    11     }
    12 
    13     //Http客户端
    14     private final HttpAgent agent;
    15 
    16     /** ClientWorker类 添加监听器方法 */
    17     public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
    18         group = null2defaultGroup(group);
    19         String tenant = agent.getTenant();
    20         CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
    21         for (Listener listener : listeners) {
    22             /** 调用CacheData对象的addListener方法*/
    23             cache.addListener(listener);
    24         }
    25     }
    /** CacheData类 监听器列表*/
        private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
    
        /**
         * CacheData类 添加监听器
         * */
        public void addListener(Listener listener) {
            if (null == listener) {
                throw new IllegalArgumentException("listener is null");
            }
            /** 包装Listener*/
            ManagerListenerWrap wrap = (listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content)
                            : new ManagerListenerWrap(listener, md5);
    
            /** 将监听器添加到列表中*/
            if (listeners.addIfAbsent(wrap)) {
                LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
                        listeners.size());
            }
        }

     逻辑并不复杂,最终是将Listener对象进行封装并添加到了CacheData对象的listeners列表中存储起来。既然有地方存了,那么就需要有地方去读,而开启监听是通过ClientWorker实例来实现。

    NacosConfigService初始化时,会初始化ClientWorker对象,ClientWorker构造函数如下:

    /** ClientWorker构造函数 */
        public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
                            final Properties properties) {
            this.agent = agent;
            this.configFilterChainManager = configFilterChainManager;
    
            /** 1.初始化配置*/
            init(properties);
    
            /** 2.创建定时任务线程池*/
            this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
    
            /** 3.创建定时任务线程池*/
            this.executorService = Executors
                    .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                            t.setDaemon(true);
                            return t;
                        }
                    });
    
            /** 4.开启定时任务,10毫秒执行一次*/
            this.executor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        /** 5.检测配置信息*/
                        checkConfigInfo();
                    } catch (Throwable e) {
                        LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                    }
                }
            }, 1L, 10L, TimeUnit.MILLISECONDS);
        }

    ClientWorker初始化时会创建两个定时任务线程池,一个只有一个线程每10毫秒执行一次checkConfigInfo方法,而另一个线程池就是专门用来处理checkConfigInfo方法内部的检查配置的逻辑,源码如下:

    /** ClientWorker检查配置信息方法*/
        public void checkConfigInfo() {
            /** 1.获取CacheData对象,key是dataId*/
            int listenerSize = cacheMap.size();
            int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
            if (longingTaskCount > currentLongingTaskCount) {
                for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                    /** 2.线程池执行LongPollingRunnable任务*/
                    executorService.execute(new LongPollingRunnable(i));
                }
                currentLongingTaskCount = longingTaskCount;
            }
        }

    checkConfigInfo方法实际就是向定时任务线程池中提交一个长轮训任务LongPollingRunnable,该任务执行逻辑如下:

    /** LongPollingRunnable线程执行逻辑 */
        public void run() {
            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            List<String> inInitializingCacheList = new ArrayList<String>();
            try {
                //遍历所有CacheData
                for (CacheData cacheData : cacheMap.values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            /** 检查CacheData的本地配置*/
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) {
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                            LOGGER.error("get local config info error", e);
                        }
                    }
                }
    
                // 校验服务器配置,检查需要更新的DataId
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                if (!CollectionUtils.isEmpty(changedGroupKeys)) {
                    LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
                }
    
                /** 遍历所有更新的配置分组key*/
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {
                        /** 获取服务器配置 */
                        String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                        /** 更新服务器配置*/
                        cache.setContent(ct[0]);
                        if (null != ct[1]) {
                            cache.setType(ct[1]);
                        }
                        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]);
                    } catch (NacosException ioe) {
                        String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                        agent.getName(), dataId, group, tenant);
                        LOGGER.error(message, ioe);
                    }
                }
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                            .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        /** 校验配置的MD5*/
                        cacheData.checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList.clear();
                executorService.execute(this);
    
            } catch (Throwable e) {
                LOGGER.error("longPolling error : ", e);
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }

    首先是检查本地配置,所以及时服务器崩溃了,nacos客户端也可以保证可以使用本地配置,本地配置存储在~nacos/config/目录下,检查完本地配置之后,再查询服务器配置,然后和本地配置进行比较的到需要更新的配置,将最新的配置写入本地。

    最后执行CacheData的checkListenerMd5()方法,该方法作用是比较配置文件的MD5加密数据是否一致,如果不一致则表示更新过,那么就需要触发监听器的回调,源码如下:

     1 /** CacheData类*/
     2     void checkListenerMd5() {
     3         for (ManagerListenerWrap wrap : listeners) {
     4             //比较MD5加密数据是否一致
     5             if (!md5.equals(wrap.lastCallMd5)) {
     6                 /** 回调Listener*/
     7                 safeNotifyListener(dataId, group, content, type, md5, wrap);
     8             }
     9         }
    10     }
    11 
    12     private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
    13                                     final String md5, final ManagerListenerWrap listenerWrap) {
    14         final Listener listener = listenerWrap.listener;
    15 
    16         Runnable job = new Runnable() {
    17             @Override
    18             public void run() {
    19                 ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
    20                 ClassLoader appClassLoader = listener.getClass().getClassLoader();
    21                 try {
    22                     if (listener instanceof AbstractSharedListener) {
    23                         AbstractSharedListener adapter = (AbstractSharedListener) listener;
    24                         adapter.fillContext(dataId, group);
    25                         LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
    26                     }
    27                     // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
    28                     Thread.currentThread().setContextClassLoader(appClassLoader);
    29 
    30                     ConfigResponse cr = new ConfigResponse();
    31                     cr.setDataId(dataId);
    32                     cr.setGroup(group);
    33                     cr.setContent(content);
    34                     configFilterChainManager.doFilter(null, cr);
    35                     String contentTmp = cr.getContent();
    36                     /** 回调执行Listener的receiveConfigInfo方法 */
    37                     listener.receiveConfigInfo(contentTmp);
    38 
    39                     // compare lastContent and content
    40                     if (listener instanceof AbstractConfigChangeListener) {
    41                         Map data = ConfigChangeHandler.getInstance()
    42                                 .parseChangeData(listenerWrap.lastContent, content, type);
    43                         ConfigChangeEvent event = new ConfigChangeEvent(data);
    44                         ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
    45                         listenerWrap.lastContent = content;
    46                     }
    47 
    48                     listenerWrap.lastCallMd5 = md5;
    49                     LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
    50                             listener);
    51                 } catch (NacosException ex) {
    52                     LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
    53                             name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
    54                 } catch (Throwable t) {
    55                     LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
    56                             group, md5, listener, t.getCause());
    57                 } finally {
    58                     Thread.currentThread().setContextClassLoader(myClassLoader);
    59                 }
    60             }
    61         };
    62 
    63         final long startNotify = System.currentTimeMillis();
    64         try {
    65             if (null != listener.getExecutor()) {
    66                 listener.getExecutor().execute(job);
    67             } else {
    68                 job.run();
    69             }
    70         } catch (Throwable t) {
    71             LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
    72                     group, md5, listener, t.getCause());
    73         }
    74         final long finishNotify = System.currentTimeMillis();
    75         LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
    76                 name, (finishNotify - startNotify), dataId, group, md5, listener);
    77     }

    当比较更新完的配置和之前的配置不一样时,就会触发监听器Listener的回调,执行Listener的receiveConfigInfo方法

    总结:

    Nacos配置中心采用的是客户端pull的方式从nacos服务器获取配置数据,并且没有和nacos服务器保持长连接,而是以定时任务执行HTTP请求的方式从Nacos服务器获取最新配置,然后再刷新到本地存储,最后再触发监听器Listener的回调方法。

    所以Nacos客户端的监听器的通知并不是nacos服务器主动推送过来的,而是nacos客户端本地轮训查询发现了配置变更之后才触发的回调。另外nacos客户端本地采用了线程池方式拉取配置,所以不会影响核心业务线程。

    3.2、服务管理实现原理

    nacos提供了大量关于服务发布和订阅的API,作为Nacos客户端,无论是服务提供者还是服务消费者,只需要在启动时调用nacos的API即可完成服务发布和服务订阅功能。但是作为注册中心,还需要有服务实例健康检查功能,服务消费者实时监听服务提供者变化的

    通知功能。而服务订阅的监听逻辑和nacos配置的变更监听流程基本上相同,订阅功能主要由subscribe方法实现,NamingService实现类是NacosNamingService,初始化时会执行init方法,初始化服务器代理serverProxy,心跳处理器beatReactor,host处理器

    hostReactor等对象,服务订阅方法subscribe方法逻辑如下:

        private HostReactor hostReactor;
    
        private BeatReactor beatReactor;
    
        private NamingProxy serverProxy;
    
        /** NacosNamingService初始化方法 */
        private void init(Properties properties) throws NacosException {
            ValidatorUtils.checkInitParam(properties);
            this.namespace = InitUtils.initNamespaceForNaming(properties);
            InitUtils.initSerialization();
            initServerAddr(properties);
            InitUtils.initWebRootContext(properties);
            initCacheDir();
            initLogName(properties);
    
            this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
            this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
            this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties),
                    isPushEmptyProtect(properties), initPollingThreadCount(properties));
        }
    
        /** NacosNamingService服务订阅方法 */
        public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
                throws NacosException {
            hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","),
                    listener);
        }
    
        /** HostReactor的服务订阅方法,并开启监听器*/
        public void subscribe(String serviceName, String clusters, EventListener eventListener) {
            /** 1.注册监听器,存入InstanceChangeNotifier对象的Map中,key是服务名称和集群,value是监听器集合 */
            notifier.registerListener(serviceName, clusters, eventListener);
            /** 2.根据服务名称获取服务器信息 */
            getServiceInfo(serviceName, clusters);
        }

     方法执行到HostReactor对象的subscribe方法,首先是将监听器存入InstanceChangeNotifier对象的Map中,根据服务名称和集群名称作为key存储,value是监听器的集合,存储起来之后调用getServiceInfo方法从nacos服务器获取服务实例信息,逻辑如下:

     1 /** HostReactor类 获取服务实例信息方法 */
     2     public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
     3         String key = ServiceInfo.getKey(serviceName, clusters);
     4         if (failoverReactor.isFailoverSwitch()) {
     5             return failoverReactor.getService(key);
     6         }
     7         /** 从本地缓存中获取ServiceInfo对象 */
     8         ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
     9 
    10         if (null == serviceObj) {// 如果本地缓存中没有服务实例
    11             serviceObj = new ServiceInfo(serviceName, clusters);
    12             serviceInfoMap.put(serviceObj.getKey(), serviceObj);
    13             updatingMap.put(serviceName, new Object());
    14             /** 立即更新服务实例*/
    15             updateServiceNow(serviceName, clusters);
    16             updatingMap.remove(serviceName);
    17 
    18         } else if (updatingMap.containsKey(serviceName)) {//判断当前服务实例是否正在更新
    19             if (UPDATE_HOLD_INTERVAL > 0) {
    20                 synchronized (serviceObj) {
    21                     try {
    22                         serviceObj.wait(UPDATE_HOLD_INTERVAL);
    23                     } catch (InterruptedException e) {
    24                         NAMING_LOGGER
    25                                 .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
    26                     }
    27                 }
    28             }
    29         }
    30 
    31         /** 定时更新服务实例信息 */
    32         scheduleUpdateIfAbsent(serviceName, clusters);
    33         return serviceInfoMap.get(serviceObj.getKey());
    34     }

     核心逻辑是先从本地获取服务实例信息,如果不存在那么立即执行updateServiceNow方法进行更新;如果已经存在那么先执行scheuleUpdateIfAbsent方法定时更新。updateServiceNow方法也就是当前线程立即更新服务实例,执行了updateService方法,

    而定时更新逻辑是先构建一个UpdateTask,然后提交给线程池来执行,定时每1秒执行一次,逻辑如下:

    /** HostReactor类 */
        public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
    
            synchronized (futureMap) {
                if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                    return;
                }
    
                /** 创建UpdateTask,并添加定时任务 */
                ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
                futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
            }
        }
    
        /** HostReactor类添加任务*/
        public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
            /** 线程池执行,每1秒执行一次*/
            return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
        }

    所以更新的逻辑主要在UpdateTask执行体类,且逻辑肯定包含了updateService方法的逻辑,源码核心逻辑如下:

    /** HostReactor 更新服务实例方法 */
        public void updateService(String serviceName, String clusters) throws NacosException {
            /** 1.从本地获取旧的服务实例 */
            ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
            try {
                /** 2.从服务器查询最新服务实例列表 */
                String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
    
                if (StringUtils.isNotEmpty(result)) {
                    /** 3.刷新本地缓存 */
                    processServiceJson(result);
                }
            } finally {
                if (oldService != null) {
                    synchronized (oldService) {
                        oldService.notifyAll();
                    }
                }
            }
        }
    
        /** UpdateTask 执行体*/
        public void run() {
            long delayTime = DEFAULT_DELAY;
            try {
                /** 1.从缓存中获取服务实例*/
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                if (serviceObj == null) {
                    /** 2.如果缓存中没有,则执行updateService方法查询*/
                    updateService(serviceName, clusters);
                    return;
                }
                /** 2.如果本地服务实例更新时间延迟,那么就执行updateService方法刷新*/
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateService(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    refreshOnly(serviceName, clusters);
                }
    
                lastRefTime = serviceObj.getLastRefTime();
    
                if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
                        .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                    // abort the update task
                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                    return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    /** 3.如果查询失败,那么失败次数自增*/
                    incFailCount();
                    return;
                }
                delayTime = serviceObj.getCacheMillis();
                /** 4.如果查询成功,那么重置失败次数*/
                resetFailCount();
            } catch (Throwable e) {
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
                /** 5.提交下一次延迟任务*/
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
        }

     可以发现更新逻辑就是执行updateService方法,首先从服务器查询最新的服务实例列表,然后将查询结果刷新到本地缓存中,然后开启下一次定时任务继续执行。默认是1秒钟执行一次,如果查询不到任何记录(服务器异常或无可用实例),那么就增加失败次数,每

    增加一次失败次数延迟执行时间就翻倍,最长会1分钟执行一次。

    另外当执行updateService方法刷新服务实例时,如果触发了服务更新,就需要更新本地缓存并且写入磁盘的持久化文件中保持,并且还会调用NotifyCenter的publishEvent方法发布服务实例变更事件,逻辑如下:

    /** HostReactor 处理查询服务实例结果方法*/
        public ServiceInfo processServiceJson(String json) {
            //......
            boolean changed = false;
            if (oldService != null) {
                //......
            } else {
                changed = true;
                /** 刷新内存中缓存*/
                serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
                /** 发布服务实例变更事件*/
                NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
                serviceInfo.setJsonFromServer(json);
                /** 写入磁盘本地数据*/
                DiskCache.write(serviceInfo, cacheDir);
            }
            //......
            return serviceInfo;
        }
    
        /** NotifyCenter 发布事件方法*/
        public static boolean publishEvent(Event event) {
            try {
                return publishEvent(event.getClass(), event);
            } catch (Throwable var2) {
                LOGGER.error("There was an exception to the message publishing : {}", var2);
                return false;
            }
        }
        private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
            if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
                return INSTANCE.sharePublisher.publish(event);
            }
    
            final String topic = ClassUtils.getCanonicalName(eventType);
            EventPublisher publisher = INSTANCE.publisherMap.get(topic);
            if (publisher != null) {
                /** 执行EventPublisher对象publish方法*/
                return publisher.publish(event);
            }
            LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
            return false;
        }

    实际是调用了EventPublisher对象的publish方法,默认实现是DefaultPublisher类,DefaultPublisher会先将通知事件存入本地队列,然后采用线程异步通知,逻辑如下:

     1  /** DefaultPublisher类 发布事件方法*/
     2     public boolean publish(Event event) {
     3         /** 1.检查并开启线程 */
     4         checkIsStart();
     5         /** 2.将事件存入队列*/
     6         boolean success = this.queue.offer(event);
     7         if (!success) {
     8             LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
     9             /** 3.如果存入队列失败,那么立即通知*/
    10             receiveEvent(event);
    11             return true;
    12         }
    13         return true;
    14     }
    15 
    16     public void run() {
    17         openEventHandler();
    18     }
    19 
    20     void openEventHandler() {
    21         try {
    22 
    23             // This variable is defined to resolve the problem which message overstock in the queue.
    24             int waitTimes = 60;
    25             // To ensure that messages are not lost, enable EventHandler when
    26             // waiting for the first Subscriber to register
    27             for (; ; ) {
    28                 if (shutdown || hasSubscriber() || waitTimes <= 0) {
    29                     break;
    30                 }
    31                 ThreadUtils.sleep(1000L);
    32                 waitTimes--;
    33             }
    34 
    35             for (; ; ) {
    36                 if (shutdown) {
    37                     break;
    38                 }
    39                 final Event event = queue.take();
    40                 receiveEvent(event);
    41                 UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
    42             }
    43         } catch (Throwable ex) {
    44             LOGGER.error("Event listener exception : {}", ex);
    45         }
    46     }
    47 
    48     void receiveEvent(Event event) {
    49         final long currentEventSequence = event.sequence();
    50         /** 遍历所有订阅者,*/
    51         for (Subscriber subscriber : subscribers) {
    52             // Whether to ignore expiration events
    53             if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
    54                 LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
    55                         event.getClass());
    56                 continue;
    57             }
    58             /** 通知订阅者,执行订阅者的onEvent方法 */
    59             notifySubscriber(subscriber, event);
    60         }
    61     }

    DefaultPublisher先将事件存入队列,然后通过异步线程从队列中取任务,遍历事件所有订阅者,依次遍历执行订阅者的onEvent方法实现事件回调通知。

    总结:

    服务管理的实现和配置管理实现原理基本一致,启动时首先会调用Nacos服务器的HTTP接口初始化一次,并且在本地内存中缓存一份,磁盘中持久化一份。然后开启定时任务轮训查询服务器最新数据,如果数据发生变化,那么就更新内存中缓存,重新写入磁盘,

    然后再由线程池异步遍历所有订阅者,回调执行订阅者的回调函数实现变更通知的逻辑。

    3.3、心跳检测

    作为服务提供者,需要和nacos服务器保持心跳,服务提供者在注册实例时会创建心跳任务,逻辑如下:

     1 /** 服务提供者 注册实例*/
     2     public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
     3         NamingUtils.checkInstanceIsLegal(instance);
     4         String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
     5         /** 如果实例是临时节点*/
     6         if (instance.isEphemeral()) {
     7             /** 构建心跳任务交给BeatReactor处理 */
     8             BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
     9             beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    10         }
    11         serverProxy.registerService(groupedServiceName, groupName, instance);
    12     }

     调用BeatReactor的addBeatInfo方法提交心跳任务

    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
            NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
            String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
            BeatInfo existBeat = null;
            //fix #1733
            if ((existBeat = dom2Beat.remove(key)) != null) {
                existBeat.setStopped(true);
            }
            dom2Beat.put(key, beatInfo);
            /** 创建并提交心跳定时任务,默认是5秒执行一次*/
            executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
            MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
        }
    
        /** 心跳定时任务执行体 */
        class BeatTask implements Runnable {
    
            BeatInfo beatInfo;
    
            public BeatTask(BeatInfo beatInfo) {
                this.beatInfo = beatInfo;
            }
    
            @Override
            public void run() {
                if (beatInfo.isStopped()) {
                    return;
                }
                long nextTime = beatInfo.getPeriod();
                try {
                    /** 发送心跳给Nacos服务器
                     *  调用Nacos服务器的 /instance/beat 接口 */
                    JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                    long interval = result.get("clientBeatInterval").asLong();
                    boolean lightBeatEnabled = false;
                    if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                        lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                    }
                    BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                    if (interval > 0) {
                        nextTime = interval;
                    }
                    int code = NamingResponseCode.OK;
                    if (result.has(CommonParams.CODE)) {
                        code = result.get(CommonParams.CODE).asInt();
                    }
                    if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                        Instance instance = new Instance();
                        instance.setPort(beatInfo.getPort());
                        instance.setIp(beatInfo.getIp());
                        instance.setWeight(beatInfo.getWeight());
                        instance.setMetadata(beatInfo.getMetadata());
                        instance.setClusterName(beatInfo.getCluster());
                        instance.setServiceName(beatInfo.getServiceName());
                        instance.setInstanceId(instance.getInstanceId());
                        instance.setEphemeral(true);
                        try {
                            /** 如果返回404,那么就重新注册实例*/
                            serverProxy.registerService(beatInfo.getServiceName(),
                                    NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                        } catch (Exception ignore) {
                        }
                    }
                } catch (NacosException ex) {
                    NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                            JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
                }
                /** 开启下一次心跳定时任务*/
                executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
            }
        }

     核心逻辑就是构建心跳定时任务交给NacosNamingService的线程池,默认每5秒发送一次心跳,实际就是调用nacos服务器的 /instance/beat接口发送心跳,心跳发送完成再开启下一次的定时任务,整体逻辑比较简单。

    总结:

    虽然nacos实现了配置中心和服务发现、服务订阅、健康检测等功能,但是nacos客户端实际上并没有和nacos服务器保持长连接,而是采用HTTP请求的方式来实现。

    配置中心就是调用查询配置HTTP接口查询并缓存在本地,然后开启定时任务轮训查询,如果发送变更就刷新本地缓存,并触发回调通知监听器;

    服务发布就是调用注册服务HTTP接口实现注册,然后开启定时任务每5秒向nacos调用一次HTTP接口发送心跳数据,nacos根据心跳来管理服务提供者的健康状态;

    服务订阅就是调用查询服务HTTP接口实现服务订阅并将服务实例信息缓存在本地,然后开启定时任务轮训查询并和本地数据进行比较,如果有更新那么就异步触发回调通知所有服务订阅者;

  • 相关阅读:
    ZOJ 3818 Pretty Poem
    HDU 4597 Play Game
    HDU 4497 GCD and LCM
    CSU 1335 高桥和低桥
    UVA 10791 Minimum Sum LCM
    CSU 1119 Collecting Coins
    CSU 1120 病毒
    UVA 12169 Disgruntled Judge
    HDU 1301 Jungle Roads
    POJ 1258 Agri-Net
  • 原文地址:https://www.cnblogs.com/jackion5/p/15715996.html
Copyright © 2011-2022 走看看