zoukankan      html  css  js  c++  java
  • Nacos深入浅出(四)

    private void executeAsyncInvoke() {
    
                while (!queue.isEmpty()) {
    
                    NotifySingleTask task = queue.poll();
                    String targetIp = task.getTargetIP();
                    if (serverListService.getServerList().contains(
                        targetIp)) {
                        // 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知
                        if (serverListService.isHealthCheck()
                            && ServerListService.getServerListUnhealth().contains(targetIp)) {
                            // target ip 不健康,则放入通知列表中
                            ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                                task.getLastModified(),
                                LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
                            // get delay time and set fail count to the task
                            int delay = getDelayTime(task);
                            Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
                            queue.add(task);
                            AsyncTask asyncTask = new AsyncTask(httpclient, queue);
                            ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
                        } else {
                            HttpGet request = new HttpGet(task.url);
                            request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
                                String.valueOf(task.getLastModified()));
                            request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);
                            if (task.isBeta) {
                                request.setHeader("isBeta", "true");
                            }
                            httpclient.execute(request, new AyscNotifyCallBack(httpclient, task));
                        }
                    }
                }
            }

    request中的内容debug出来

    http://10.129.13.96:8848/nacos/v1/cs/communication/dataChange?dataId=springboot2-nacos-config&group=DEFAULT_GROUP

     这里又发了一个请求出去,跳转到/communication/dataChange这个里面去,继续跟进,

    这个请求转发到

    CommunicationController.java中来了

    /**
         * 通知配置信息改变
         */
        @RequestMapping(value = "/dataChange", method = RequestMethod.GET)
        @ResponseBody
        public Boolean notifyConfigInfo(HttpServletRequest request, HttpServletResponse response,
                                        @RequestParam("dataId") String dataId, @RequestParam("group") String group,
                                        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
                                            String tenant,
                                        @RequestParam(value = "tag", required = false) String tag) {
            dataId = dataId.trim();
            group = group.trim();
            String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
            long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
            String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
            String isBetaStr = request.getHeader("isBeta");
            if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
                dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
            } else {
                dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
            }
            return true;
        }

     TaskManager.java

    /**
         * 将任务加入到任务Map中
         *
         * @param type
         * @param task
         */
        public void addTask(String type, AbstractTask task) {
            this.lock.lock();
            try {
                AbstractTask oldTask = tasks.put(type, task);
                MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
                if (null != oldTask) {
                    task.merge(oldTask);
                }
            } finally {
                this.lock.unlock();
            }
        }

    task具体内容如下

    这目前为止,Nacos的整个思路就是把每一个需要通知的操作,封装成一个task,直接把这个task扔到一个队列里面去,然后这个队列在不断的循环去poll,

    只要队列里面有东西,就去执行这个task对应的processor;

    TaskManager.java
    这个类就是我们最终的处理类,真正去做通知更新的管理类,上代码,这里就不贴整个类了,局部展示,

    public final class TaskManager implements TaskManagerMBean {
    
        class ProcessRunnable implements Runnable {
    
            public void run() {
                while (!TaskManager.this.closed.get()) {
                    try {
                        Thread.sleep(100);
                        TaskManager.this.process();
                    } catch (Throwable e) {
                    }
                }
    
            }
    
        }

    这里面起了一个线程,只要没有关闭,就死循环去执行process;

     protected void process() {
            for (Map.Entry<String, AbstractTask> entry : this.tasks.entrySet()) {
                AbstractTask task = null;
                this.lock.lock();
                try {
                    // 获取任务
                    task = entry.getValue();
                    if (null != task) {
                        if (!task.shouldProcess()) {
                            // 任务当前不需要被执行,直接跳过
                            continue;
                        }
                        // 先将任务从任务Map中删除
                        this.tasks.remove(entry.getKey());
                        MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
                    }
                } finally {
                    this.lock.unlock();
                }
    
                if (null != task) {
                    // 获取任务处理器
                    TaskProcessor processor = this.taskProcessors.get(entry.getKey());
                    if (null == processor) {
                        // 如果没有根据任务类型设置的处理器,使用默认处理器
                        processor = this.getDefaultTaskProcessor();
                    }
                    if (null != processor) {
                        boolean result = false;
                        try {
                            // 处理任务
                            result = processor.process(entry.getKey(), task);
                        } catch (Throwable t) {
                            log.error("task_fail", "处理task失败", t);
                        }
                        if (!result) {
                            // 任务处理失败,设置最后处理时间
                            task.setLastProcessTime(System.currentTimeMillis());
    
                            // 将任务重新加入到任务Map中
                            this.addTask(entry.getKey(), task);
                        }
                    }
                }
            }

    重点!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

    class DumpProcessor implements TaskProcessor {
    
        DumpProcessor(DumpService dumpService) {
            this.dumpService = dumpService;
        }
    
        @Override
        public boolean process(String taskType, AbstractTask task) {
            DumpTask dumpTask = (DumpTask)task;
            String[] pair = GroupKey2.parseKey(dumpTask.groupKey);
            String dataId = pair[0];
            String group = pair[1];
            String tenant = pair[2];
            long lastModified = dumpTask.lastModified;
            String handleIp = dumpTask.handleIp;
            boolean isBeta = dumpTask.isBeta;
            String tag = dumpTask.tag;
            if (isBeta) {
                // beta发布,则dump数据,更新beta缓存
                ConfigInfo4Beta cf = dumpService.persistService.findConfigInfo4Beta(dataId, group, tenant);
                boolean result;
                if (null != cf) {
                    result = ConfigService.dumpBeta(dataId, group, tenant, cf.getContent(), lastModified, cf.getBetaIps());
                    if (result) {
                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                            cf.getContent().length());
                    }
                } else {
                    result = ConfigService.removeBeta(dataId, group, tenant);
                    if (result) {
                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                    }
                }
                return result;
            } else {
                if (StringUtils.isBlank(tag)) {
                    ConfigInfo cf = dumpService.persistService.findConfigInfo(dataId, group, tenant);
                    if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
                        if (null != cf) {
                            AggrWhitelist.load(cf.getContent());
                        } else {
                            AggrWhitelist.load(null);
                        }
                    }
    
                    if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
                        if (null != cf) {
                            ClientIpWhiteList.load(cf.getContent());
                        } else {
                            ClientIpWhiteList.load(null);
                        }
                    }
    
                    if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
                        if (null != cf) {
                            SwitchService.load(cf.getContent());
                        } else {
                            SwitchService.load(null);
                        }
                    }
    
                    boolean result;
                    if (null != cf) {
                        result = ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified);
    
                        if (result) {
                            ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                                ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                                cf.getContent().length());
                        }
                    } else {
                        result = ConfigService.remove(dataId, group, tenant);
    
                        if (result) {
                            ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                                ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                        }
                    }
                    return result;
                } else {
                    ConfigInfo4Tag cf = dumpService.persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
                    //
                    boolean result;
                    if (null != cf) {
                        result = ConfigService.dumpTag(dataId, group, tenant, tag, cf.getContent(), lastModified);
                        if (result) {
                            ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                                ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                                cf.getContent().length());
                        }
                    } else {
                        result = ConfigService.removeTag(dataId, group, tenant, tag);
                        if (result) {
                            ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                                ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                        }
                    }
                    return result;
                }
            }
    
        }
    
        final DumpService dumpService;
    }

     篇幅有点长,放到下一篇,go!!

  • 相关阅读:
    题解——逃离僵尸岛(BFS+最短路+虚拟节点)
    题解——history(离线并查集)
    最短路计数
    【NOI OL #3】优秀子序列
    枚举子集的方法
    【NOI OL #3】魔法值
    【NOI OL #3】水壶
    【HEOI2012】采花
    【JSOI2009】计数问题
    【POI2015】LOG
  • 原文地址:https://www.cnblogs.com/longxok/p/11015309.html
Copyright © 2011-2022 走看看