zoukankan      html  css  js  c++  java
  • Nacos配置中心-源码解析

    一:关于Nacos的思考

         首先思考一个问题,Nacos作为配置中心,Nacos 客户端是怎么实时获取到 Nacos 服务端的最新数据?

         其实客户端和服务端之间的数据交互,无外乎两种情况:

         1.服务端推数据给客户端

         2.客户端从服务端拉数据

         zk作为配置中心,基于zk的watcher机制,配置发生变化通知客户端,Nacos也是同样的原理吗?

    二:Nacos的源码解析

          看看Nacos是如何获取服务端的最新数据

           

          createConfigService的作用:通过反射创建NacosConfigService

        

       public static ConfigService createConfigService(Properties properties) throws NacosException {
            try {
                Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
                Constructor constructor = driverImplClass.getConstructor(Properties.class);
                ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
                return vendorImpl;
            } catch (Throwable e) {
                throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
            }
        }
    View Code

         NacosConfigService的构造函数

         

        new MetricsHttpAgent(new ServerHttpAgent(properties))作用:MetricsHttpAgent包装ServerHttpAgent(装饰器模式)为了做一些统计等功能。
    agent.start()作用 : 监听服务器列表是否变化

          

    public class MetricsHttpAgent implements HttpAgent {
        private HttpAgent httpAgent;
    
        public MetricsHttpAgent(HttpAgent httpAgent) {
            this.httpAgent = httpAgent;
        }
    
        @Override
        public void start() throws NacosException {
            httpAgent.start();
        }
    
        @Override
        public HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException {
            Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("GET", path, "NA");
            HttpResult result = null;
            try {
                result = httpAgent.httpGet(path, headers, paramValues, encoding, readTimeoutMs);
            } catch (IOException e) {
                throw e;
            } finally {
                timer.observeDuration();
                timer.close();
            }
    
            return result;
        }
    
        @Override
        public HttpResult httpPost(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException {
            Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("POST", path, "NA");
            HttpResult result = null;
            try {
                result = httpAgent.httpPost(path, headers, paramValues, encoding, readTimeoutMs);
            } catch (IOException e) {
                throw e;
            } finally {
                timer.observeDuration();
                timer.close();
            }
    
            return result;
        }
    
        @Override
        public HttpResult httpDelete(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException {
            Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("DELETE", path, "NA");
            HttpResult result = null;
            try {
                result = httpAgent.httpDelete(path, headers, paramValues, encoding, readTimeoutMs);
            } catch (IOException e) {
    
                throw e;
            } finally {
                timer.observeDuration();
                timer.close();
            }
    
            return result;
        }
    
        @Override
        public String getName() {
            return httpAgent.getName();
        }
    
        @Override
        public String getNamespace() {
            return httpAgent.getNamespace();
        }
    
        @Override
        public String getTenant() {
            return httpAgent.getTenant();
        }
    
        @Override
        public String getEncode() {
            return httpAgent.getEncode();
        }
    }
    View Code
          new ClientWorker(agent, configFilterChainManager, properties)的作用:线程池轮询服务器

           

        public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
            this.agent = agent;
            this.configFilterChainManager = configFilterChainManager;
    
            // Initialize the timeout parameter
    
            init(properties);
    
            executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
    
            executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
    
            executor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        checkConfigInfo();
                    } catch (Throwable e) {
                        LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                    }
                }
            }, 1L, 10L, TimeUnit.MILLISECONDS);
        }
    View Code
        checkConfigInfo作用:检查配置信息

         

        public void checkConfigInfo() {
            // 分任务
            int listenerSize = cacheMap.get().size();
            // 向上取整为批数
            int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
            if (longingTaskCount > currentLongingTaskCount) {
                for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                    // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
                    executorService.execute(new LongPollingRunnable(i));
                }
                currentLongingTaskCount = longingTaskCount;
            }
        }
    View Code
         LongPollingRunnable:run方法是一个长轮询服务端过程,大致分为四块部分
    第一部分:检查本地配置相关

           

            checkLocalConfig作用:判断本地配置是否存在,是否有变更,dataId=“example”和group=“DEFAULT_GROUP”在Windows环境下默认配置路径(C:UsersAdministrator acosconfigfixed-localhost_8848_nacosdataconfig-dataDEFAULT_GROUPexample)。

           

        private void checkLocalConfig(CacheData cacheData) {
            final String dataId = cacheData.dataId;
            final String group = cacheData.group;
            final String tenant = cacheData.tenant;
            File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
    
            // 没有 -> 有
            if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
                String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
                String md5 = MD5.getInstance().getMD5String(content);
                cacheData.setUseLocalConfigInfo(true);
                cacheData.setLocalConfigInfoVersion(path.lastModified());
                cacheData.setContent(content);
    
                LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
                    agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
                return;
            }
    
            // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
            if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
                cacheData.setUseLocalConfigInfo(false);
                LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
                    dataId, group, tenant);
                return;
            }
    
            // 有变更
            if (cacheData.isUseLocalConfigInfo() && path.exists()
                && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
                String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
                String md5 = MD5.getInstance().getMD5String(content);
                cacheData.setUseLocalConfigInfo(true);
                cacheData.setLocalConfigInfoVersion(path.lastModified());
                cacheData.setContent(content);
                LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
                    agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
            }
        }
    View Code

          第二部分:检查server端的变更

              

         第三部分:如果server端有变更,更新CacheData的content

             

         第四部分:检查md5的值是否改变,即配置是否改变,选择唤醒listener回调

              

        checkListenerMd5作用:检查CacheData的md5和wrap(listener的包装器)的md5是否一致,如果不一致唤醒回调

              

            sageNotifyListener作用 : 触发listener的回调函数

           

              配置中心的完整流程已经分析完毕了,可以发现,Nacos 并不是通过推的方式将服务端最新的配置信息发送给客户端的,而是客户端维护了一个长轮询的任务,定时去拉取发生变更的配置信息,然后将最新的数据推送给 Listener 的持有者。

    三:Nacos配置中心总结归纳

          1. 关闭Nacos服务端,删除本地配置,启动测试类,依然能获取配置信息,为什么呢?

              跟踪代码 String config = configService.getConfig("example", "DEFAULT_GROUP", 10)

              

                当前环境下快照地址为:C:UsersAdministrator acosconfigfixed-localhost_8848_nacossnapshotDEFAULT_GROUPexample

            2.客户端拉取服务端的数据与服务端推送数据给客户端相比,优势在哪呢,为什么 Nacos 不设计成主动推送数据,而是要客户端去拉取呢?

              如果用推的方式,服务端需要维持与客户端的长连接,这样的话需要耗费大量的资源,并且还需要考虑连接的有效性,例如需要通过心跳来维持两者之间的连接。而用拉的方式,客户端只需要通过一个无状态的 http 请求即可获取到服务端的数据。       

            3. Nacos实现配置中心的原理?      

               客户端是通过一个定时任务来检查自己监听的配置项的数据,一旦本地配置或者服务端的数据发生变化时,客户端将会获取到最新的数据(跟踪checkUpdateDataIds方法可以明白),优先本地配置,并将最新的数据保存在一个 CacheData 对象中,然后会重新计算 CacheData 的 md5 属性的值,此时就会对该 CacheData 所绑定的 Listener 触发 receiveConfigInfo 回调。考虑到服务端故障的问题,客户端将最新数据获取后会保存在本地的 snapshot 文件中。

            上述为本人阅读源码的理解,可能存在误差。

     


         



    
    
  • 相关阅读:
    令我印象最深刻的三个老师
    硬盘大于2T安装CentOS7.X时要注意分区
    Linux网卡配置
    Python13:文件操作
    Python12:集合
    Python11:字典
    Python10:String字符串
    Python09:元组
    Python08:列表
    Python07:模块初识
  • 原文地址:https://www.cnblogs.com/dyg0826/p/11404678.html
Copyright © 2011-2022 走看看