zoukankan      html  css  js  c++  java
  • Nacos深入浅出(三)

     EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));

    跟着代码一步步往下看:

    public class EventDispatcher {
    
        /**
         * add event listener
         */
        static public void addEventListener(AbstractEventListener listener) {
            for (Class<? extends Event> type : listener.interest()) {
                getEntry(type).listeners.addIfAbsent(listener);
            }
        }
    
        /**
         * fire event, notify listeners.
         */
        static public void fireEvent(Event event) {
            if (null == event) {
                throw new IllegalArgumentException();
            }
    
            for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
                try {
                    listener.onEvent(event);
                } catch (Exception e) {
                    log.error(e.toString(), e);
                }
            }
        }

    这个getEntry(event.getClass()).listeners需要重点看下

    /**
         * get event listener for eventType. Add Entry if not exist.
         */
        static Entry getEntry(Class<? extends Event> eventType) {
            for (; ; ) {
                for (Entry entry : LISTENER_HUB) {
                    if (entry.eventType == eventType) {
                        return entry;
                    }
                }
    
                Entry tmp = new Entry(eventType);
                /**
                 *  false means already exists
                 */
                if (LISTENER_HUB.addIfAbsent(tmp)) {
                    return tmp;
                }
            }
        }
        static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>();

    这个LISTENER_HUB 会预先把几个相应的Entry 加载进去(这个我们后面分析),然后就开始onEvent,通知Listeners了;

    @Service
    public class AsyncNotifyService extends AbstractEventListener {
    
        @Override
        public List<Class<? extends Event>> interest() {
            List<Class<? extends Event>> types = new ArrayList<Class<? extends Event>>();
            // 触发配置变更同步通知
            types.add(ConfigDataChangeEvent.class);
            return types;
        }
    
        @Override
        public void onEvent(Event event) {
    
            // 并发产生 ConfigDataChangeEvent
            if (event instanceof ConfigDataChangeEvent) {
                ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
       // listen的服务地址 List
    <?> ipList = serverListService.getServerList(); // 其实这里任何类型队列都可以 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); for (int i = 0; i < ipList.size(); i++) { queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String)ipList.get(i), evt.isBeta)); } EXCUTOR.execute(new AsyncTask(httpclient, queue)); } }

    把这个配置关联的服务一个个通知下,建议大家可以结合观察者模式来理解这个;

        private static final Executor EXCUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());

    然后把这个通知服务扔到县线程池,放到另外一个线程中去执行通知任务;

     class AsyncTask implements Runnable {
    
            public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {
                this.httpclient = httpclient;
                this.queue = queue;
            }
    
            @Override
            public void run() {
    
                executeAsyncInvoke();
    
            }
    
            private void executeAsyncInvoke() {
    while (!queue.isEmpty()) { NotifySingleTask task = queue.poll(); String targetIp = task.getTargetIP(); if (serverListService.getServerList().contains( targetIp)) { // 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知 if (serverListService.isHealthCheck() && ServerListService.getServerListUnhealth().contains(targetIp)) { // target ip 不健康,则放入通知列表中 ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task int delay = getDelayTime(task); Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); queue.add(task); AsyncTask asyncTask = new AsyncTask(httpclient, queue); ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); } else { HttpGet request = new HttpGet(task.url); request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified())); request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP); if (task.isBeta) { request.setHeader("isBeta", "true"); } httpclient.execute(request, new AyscNotifyCallBack(httpclient, task)); } } } } private Queue<NotifySingleTask> queue; private CloseableHttpAsyncClient httpclient; }

    上面这段代码明天再慢慢解析吧,加油!

  • 相关阅读:
    2. Add Two Numbers
    8. String to Integer (atoi)
    18. 4Sum
    15. 3Sum
    1. Two Sum
    227. Basic Calculator
    7. Reverse Integer
    PostMessage和SendMessage的区别
    Date Time Picker控件
    git 设置和取消代理
  • 原文地址:https://www.cnblogs.com/longxok/p/11011722.html
Copyright © 2011-2022 走看看