zoukankan      html  css  js  c++  java
  • Nacos配置中心源码分析

    1. 什么是Nacos ?

    Nacos主要用做注册中心和配置中心。Nacos介绍,Nacos用法, Nacos源码下载 etc.. 请查看Nacos官方文档, 本文基于nacos版本1.2.0进行分析。

    2. Nacos代码入口

    从官方文档给的JAVA SDK 入手, 这样可以知道使用流程,也可以通过入口,分析代码。官方给的代码如下:

    try {
    	String serverAddr = "{serverAddr}";
    	String dataId = "{dataId}";
    	String group = "{group}";
    	Properties properties = new Properties();
    	properties.put("serverAddr", serverAddr);
    	ConfigService configService = NacosFactory.createConfigService(properties);
    	String content = configService.getConfig(dataId, group, 5000);
    	System.out.println(content);
    } catch (NacosException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    

    代码先ConfigService,然后通过configService进行数据库操作,我们也从configservice作为入口,进行代码分析。
    tips: 找入口最好从使用的demo开始,这样可以快速找到入口,分析代码。

    3. 代码分析

    ConfigService的实现类只有下面一个,我们就从该类的构造方法开始.

       public NacosConfigService(Properties properties) throws NacosException {
            String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
            if (StringUtils.isBlank(encodeTmp)) {
                encode = Constants.ENCODE;
            } else {
                encode = encodeTmp.trim();
            }
            initNamespace(properties);
            // 用户登录信息
            agent = new MetricsHttpAgent(new ServerHttpAgent(properties));  
            // 维护nacos服务列表
            agent.start();
            //  更新维护配置
            worker = new ClientWorker(agent, configFilterChainManager, properties);
        }
    

    代码中主要代码有三行,HttpAgent实现类有MetricsHttpAgent和ServerHttpAgent。我们一步步点进去看。

    点进MetricsHttpAgent,核心调用逻辑是通过构造参数传入的HttpAgent, 该类是对传入的HttpAgent调用的方法加了增加了时间检测。(装饰器模式, 没有通过名称写出来)
    在进入ServerHttpAgent,代码中实现了定时任务调度,登录Nacos(客户端获取配置、服务注册列表需要建立连接),时间是5秒一次。

    public ServerHttpAgent(Properties properties) throws NacosException {
            ...
    
            ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    ...
                }
            });
    
            executorService.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    securityProxy.login(serverListMgr.getServerUrls());
                }
            }, 0, securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
        }
    

    我们进入start方法, 通过代码,我们知道实际调用的是ServerHttpAgent中的start方法, 该类通过定时任务,维护nacos的列表。进入此方法,看看具体实现逻辑。

       public synchronized void start() throws NacosException {
            if (isStarted || isFixed) {
                return;
            }
            
            //  runnable接口,维护serverUrlList
            GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);
            for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {
                // 判断服务列表是否发生改变,如果发生改变,则更新服务列表
                getServersTask.run();
                try {
                    this.wait((i + 1) * 100L);
                } catch (Exception e) {
                    LOGGER.warn("get serverlist fail,url: {}", addressServerUrl);
                }
            }
    
            if (serverUrls.isEmpty()) {
                LOGGER.error("[init-serverlist] fail to get NACOS-server serverlist! env: {}, url: {}", name,
                    addressServerUrl);
                throw new NacosException(NacosException.SERVER_ERROR,
                    "fail to get NACOS-server serverlist! env:" + name + ", not connnect url:" + addressServerUrl);
            }
            // 将自己在丢到定时任务里面执行,执行时间为30秒一次
            TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
            isStarted = true;
        }
    

    接下来进入最核心的代码,如何对文件配置进行同步,缓存, 本地和远程差异化比较的。

       public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
            this.agent = agent;
            this.configFilterChainManager = configFilterChainManager;
            // Initialize the timeout parameter
            init(properties);
    
            executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
            // 通过thread名称我们知道,是进行长轮询的
            executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
    
            executor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 检查配置信息
                        checkConfigInfo();
                    } catch (Throwable e) {
                        LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                    }
                }
            }, 1L, 10L, TimeUnit.MILLISECONDS);
        }
    ``
    初始化了两个线程池,通过名称看出,executorService是用来进行长轮询的线程池,具体干嘛,我们还不清楚,等等再看。  另外一个线程池,executor是用来检查配置信息的。我们进入此方法。
    ```java
       public void checkConfigInfo() {
            // 分任务
            int listenerSize = cacheMap.get().size();
            // 向上取整为批数
            int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
            if (longingTaskCount > currentLongingTaskCount) {
                for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                    // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
                    // executorService在这里使用到, 用来执行配置检测的长轮询使用
                    executorService.execute(new LongPollingRunnable(i));
                }
                currentLongingTaskCount = longingTaskCount;
            }
        }
    

    防止一次需要更新的太多,时间慢,所以将配置检测的任务分批次执行,分成多个LongPollingRunnable执行,核心代码在LongPollingRunnable中,我们进入代码中查看。

    class LongPollingRunnable implements Runnable {
            private int taskId;
    
            public LongPollingRunnable(int taskId) {
                this.taskId = taskId;
            }
    
            @Override
            public void run() {
    
                List<CacheData> cacheDatas = new ArrayList<CacheData>();
                List<String> inInitializingCacheList = new ArrayList<String>();
                try {
                    // check failover config
                    for (CacheData cacheData : cacheMap.get().values()) {
                        //根据taskId对cacheMap中的数据进行操作。(查看setTaskId方法, 源码中关联相关方法没有被调用,即这个判断永远不成立)
                        if (cacheData.getTaskId() == taskId) {
                            cacheDatas.add(cacheData);
                            try {
                                // 检测本地配置是否发生变化,标记该配置是否可以用本地缓存的
                                checkLocalConfig(cacheData);
                                if (cacheData.isUseLocalConfigInfo()) {
                                    cacheData.checkListenerMd5();
                                }
                            } catch (Exception e) {
                                LOGGER.error("get local config info error", e);
                            }
                        }
                    }
    
                    // check server config
                    // 把可能改变的配置,发送到服务器端进行比较,返回修改配置的dataid, groupId
                    List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                    LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
    
                    for (String groupKey : changedGroupKeys) {
                        String[] key = GroupKey.parseKey(groupKey);
                        String dataId = key[0];
                        String group = key[1];
                        String tenant = null;
                        if (key.length == 3) {
                            tenant = key[2];
                        }
                        try {
                            // 从服务端获取更新的key的值, 并进行缓存,生成文件快照
                            String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                            CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                            cache.setContent(ct[0]);
                            if (null != ct[1]) {
                                cache.setType(ct[1]);
                            }
                            LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                                agent.getName(), dataId, group, tenant, cache.getMd5(),
                                ContentUtils.truncateContent(ct[0]), ct[1]);
                        } catch (NacosException ioe) {
                            String message = String.format(
                                "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                agent.getName(), dataId, group, tenant);
                            LOGGER.error(message, ioe);
                        }
                    }
                    //  检查新的值,并设置是否首次更新和初始化状态 ( cacheData 首次出现在cacheMap中&首次check更新)
                    for (CacheData cacheData : cacheDatas) {
                        if (!cacheData.isInitializing() || inInitializingCacheList
                            .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                            cacheData.checkListenerMd5();
                            cacheData.setInitializing(false);
                        }
                    }
                    inInitializingCacheList.clear();
                    // 再次执行此任务
                    executorService.execute(this);
                } catch (Throwable e) {
                    // If the rotation training task is abnormal, the next execution time of the task will be punished
                    LOGGER.error("longPolling error : ", e);
                    executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
                }
            }
        }
    

    此方法执行了缓存判断,更新哪些缓存,是否需要更新, 生成缓存文件,更新缓存等,我们在进去一些关键方法中查看一下。

    方法一 checkUpdateDataIds下的checkUpdateConfigStr

    List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
            List<String> params = new ArrayList<String>(2);
            params.add(Constants.PROBE_MODIFY_REQUEST);
            params.add(probeUpdateString);
    
            List<String> headers = new ArrayList<String>(2);
            headers.add("Long-Pulling-Timeout");
            headers.add("" + timeout); // 默认时间是30000ms = 30s
    
            // told server do not hang me up if new initializing cacheData added in
            // 如果是初始化过的配置,没有发生变化的,则会进行等待。 长轮询,监听配置的改变。
            if (isInitializingCacheList) {
                headers.add("Long-Pulling-Timeout-No-Hangup");
                headers.add("true");
            }
    
            if (StringUtils.isBlank(probeUpdateString)) {
                return Collections.emptyList();
            }
    
            try {
                // In order to prevent the server from handling the delay of the client's long task,
                // increase the client's read timeout to avoid this problem.
    
                long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
                // 发送请求 , 这段逻辑需要去服务端看。 服务端大体逻辑就是 解析传入的groupdataId 字符串,通过MD5比较, 找出被修改的配置。 如果没有,则会悬挂起来。
                // 服务器在响应的时候会提前500ms返回,防止请求超时。 
                HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
                    agent.getEncode(), readTimeoutMs);
    
                if (HttpURLConnection.HTTP_OK == result.code) {
                    setHealthServer(true);
                    // 解析服务段返回的修改的配置的数据信息
                    return parseUpdateDataIdResponse(result.content);
                } else {
                    setHealthServer(false);
                    LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
                }
            } catch (IOException e) {
                setHealthServer(false);
                LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
                throw e;
            }
            return Collections.emptyList();
        }
    

    方法二 getServerConfig方法, 拿到更改的goupid, dataid ,从服务器端获取数据,更新快照文件

    public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)
            throws NacosException {
            String[] ct = new String[2];
            if (StringUtils.isBlank(group)) {
                group = Constants.DEFAULT_GROUP;
            }
    
            HttpResult result = null;
            try {
                List<String> params = null;
                if (StringUtils.isBlank(tenant)) {
                    params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group));
                } else {
                    params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group, "tenant", tenant));
                }
                // 发送请求,获取数据
                result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
            } catch (IOException e) {
                String message = String.format(
                    "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
                    dataId, group, tenant);
                LOGGER.error(message, e);
                throw new NacosException(NacosException.SERVER_ERROR, e);
            }
    
            switch (result.code) {
                case HttpURLConnection.HTTP_OK:
                    // 更新快照文件
                    LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
                    ct[0] = result.content;
                    if (result.headers.containsKey(CONFIG_TYPE)) {
                        ct[1] = result.headers.get(CONFIG_TYPE).get(0);
                    } else {
                        ct[1] = ConfigType.TEXT.getType();
                    }
                    return ct;
                case HttpURLConnection.HTTP_NOT_FOUND:
                    LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
                    return ct;
                case HttpURLConnection.HTTP_CONFLICT: {
                    LOGGER.error(
                        "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
                            + "tenant={}", agent.getName(), dataId, group, tenant);
                    throw new NacosException(NacosException.CONFLICT,
                        "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
                }
                case HttpURLConnection.HTTP_FORBIDDEN: {
                    LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId,
                        group, tenant);
                    throw new NacosException(result.code, result.content);
                }
                default: {
                    LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId,
                        group, tenant, result.code);
                    throw new NacosException(result.code,
                        "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
                }
            }
        }
    

    至此,我们分析了代码的大部分逻辑,通过线程任务,记录数据状态,来比较服务器端和本地的配置差一点,从而实现数据修改变化的判断,我们画一下大概的架构图。

    4. 流程图

    简易架构图

    5. 配置中心客户端大致思想

    • a. 每次网络请求较慢,本地缓存
    • b. 异步比较差异,采用主动轮询 + 被动通知(长轮询中)的方式
    • c. 防止数据量过大,分批处理
    • d. 数据变化监听
  • 相关阅读:
    Sql Server 存储过程删除一个表里(除ID外)完全重复的数据记录
    把一个库中的表复制到另外一个库的表中(Sql server 2005)
    ajax执行后台返回的提交表单及JS
    WinCE中使用本地数据库SQLite以及得到当前应用程序所在路径
    如何评测一个软件工程师的计算机网络知识水平与网络编程技能水平
    如何评测软件工程知识技能水平?
    深入理解TCP协议及其源代码
    Socket与系统调用深度分析
    创新产品的需求分析:未来的图书会是什么样子?
    ubuntu小问题集合
  • 原文地址:https://www.cnblogs.com/lifacheng/p/12691843.html
Copyright © 2011-2022 走看看