zoukankan      html  css  js  c++  java
  • Nacos源码深度解析1-服务注册初始化(客户端)

    一.初始化
    NamingService naming = NamingFactory.createNamingService(properties);
    二.通过反射传入properties生成NacosNamingService的实例
    /**
         * Create a new naming service.
         *
         * @param properties naming service properties
         * @return new naming service
         * @throws NacosException nacos exception
         */
        public static NamingService createNamingService(Properties properties) throws NacosException {
            try {
                Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
                Constructor constructor = driverImplClass.getConstructor(Properties.class);
                NamingService vendorImpl = (NamingService) constructor.newInstance(properties);
                return vendorImpl;
            } catch (Throwable e) {
                throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
            }
        }
    三.init初始化
     
     public NacosNamingService(Properties properties) throws NacosException {
            init(properties);
        }
    
        private void init(Properties properties) throws NacosException {
            ValidatorUtils.checkInitParam(properties);
            this.namespace = InitUtils.initNamespaceForNaming(properties);
            InitUtils.initSerialization();
            //解析serverlist,endpoint
            initServerAddr(properties);
            InitUtils.initWebRootContext();
            //缓存地址C:Usersadmin
    acos
    amingfe3078a2-019d-4fdd-b6cf-22c3117f847d
            initCacheDir();
            //初始化日志信息
            initLogName(properties);
            //监听事件变更
            this.eventDispatcher = new EventDispatcher();
            //1.根据endpoint查找serverlist,nacos服务器地址;2.安全登陆验证
            this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
            //与服务端建立上报机制
            this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
            this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
                    isLoadCacheAtStart(properties), initPollingThreadCount(properties));
        }      
     
    四.EventDispatcher 监听事件分发
    public EventDispatcher() {
            
            this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
                    thread.setDaemon(true);
                    
                    return thread;
                }
            });
            
            this.executor.execute(new Notifier());
        }
     
    //changedServices 改变的服务
    private final BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>();
    //服务 - 监听器回调
    private final ConcurrentMap<String, List<EventListener>> observerMap = new ConcurrentHashMap<String, List<EventListener>>();
        private class Notifier implements Runnable {
            
            @Override
            public void run() {
                while (!closed) {
                    
                    ServiceInfo serviceInfo = null;
                    try {
                        serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
                    } catch (Exception ignore) {
                    }
                    
                    if (serviceInfo == null) {
                        continue;
                    }
                    
                    try {
                        List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
                        
                        if (!CollectionUtils.isEmpty(listeners)) {
                            for (EventListener listener : listeners) {
                                List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                                listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                                        serviceInfo.getClusters(), hosts));
                            }
                        }
                        
                    } catch (Exception e) {
                        NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: "
                                + serviceInfo.getClusters(), e);
                    }
                }
            }
        }
    五.NamingProxy
    1.根据endpoint查找serverlist,nacos服务器地址;2.安全登陆验证
    public NamingProxy(String namespaceId, String endpoint, String serverList, Properties properties) {
    
            this.securityProxy = new SecurityProxy(properties, nacosRestTemplate);
            this.properties = properties;
            this.setServerPort(DEFAULT_SERVER_PORT);
            this.namespaceId = namespaceId;
            this.endpoint = endpoint;
            if (StringUtils.isNotEmpty(serverList)) {
                this.serverList = Arrays.asList(serverList.split(","));
                if (this.serverList.size() == 1) {
                    this.nacosDomain = serverList;
                }
            }
            this.initRefreshTask();
        }
    private void initRefreshTask() {
    
            this.executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.naming.updater");
                    t.setDaemon(true);
                    return t;
                }
            });
    
            this.executorService.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    //根据endpoint定时刷新serverlist地址
                    refreshSrvIfNeed();
                }
            }, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
    
            this.executorService.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    //定时安全登陆验证
                    securityProxy.login(getServerList());
                }
            }, 0, securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
    
            refreshSrvIfNeed();
            this.securityProxy.login(getServerList());
        }
    六.初始化心跳线程池
     public BeatReactor(NamingProxy serverProxy, int threadCount) {
            this.serverProxy = serverProxy;
            this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.beat.sender");
                    return thread;
                }
            });
        }
     
    七.HostReactor
    ublic HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,
                String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {
            // init executorService
            this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.client.naming.updater");
                    return thread;
                }
            });
            this.eventDispatcher = eventDispatcher;
            this.beatReactor = beatReactor;
            this.serverProxy = serverProxy;
            this.cacheDir = cacheDir;
            if (loadCacheAtStart) {
                this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
            } else {
                this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
            }
            
            this.updatingMap = new ConcurrentHashMap<String, Object>();
            //容灾备份策略
            this.failoverReactor = new FailoverReactor(this, cacheDir);
            //接收服务器端的数据
            this.pushReceiver = new PushReceiver(this);
        }
     
     
    八.PushReceiver
    public PushReceiver(HostReactor hostReactor) {
            try {
                this.hostReactor = hostReactor;
                this.udpSocket = new DatagramSocket();
                this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setDaemon(true);
                        thread.setName("com.alibaba.nacos.naming.push.receiver");
                        return thread;
                    }
                });
                
                this.executorService.execute(this);
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] init udp socket failed", e);
            }
        }
    九.run 方法
    @Override
    public void run() {
        while (!closed) {
            try {
                
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                
                udpSocket.receive(packet);
                
                String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                
                PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    //处理从服务端接收的数据
                    hostReactor.processServiceJson(pushPacket.data);
                    
                    // send ack to server
                    ack = "{"type": "push-ack"" + ", "lastRefTime":"" + pushPacket.lastRefTime + "", "data":"
                            + """}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{"type": "dump-ack"" + ", "lastRefTime": "" + pushPacket.lastRefTime + "", "data":"
                            + """ + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                            + ""}";
                } else {
                    // do nothing send ack only
                    ack = "{"type": "unknown-ack"" + ", "lastRefTime":"" + pushPacket.lastRefTime
                            + "", "data":" + """}";
                }
                //发送ack给服务器端
                udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                        packet.getSocketAddress()));
            } catch (Exception e) {
                if (closed) {
                    return;
                }
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }
     
    十.处理服务器端数据
    public ServiceInfo processServiceJson(String json) {
            ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
            ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
            if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
                //empty or error push, just ignore
                return oldService;
            }
    
            boolean changed = false;
    
            if (oldService != null) {
                //处理服务器端数据,判断是新增,删除,还是修改
                if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                    NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                            + serviceInfo.getLastRefTime());
                }
                //向服务缓存map中,放置新的服务
                serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    
                Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
                for (Instance host : oldService.getHosts()) {
                    oldHostMap.put(host.toInetAddr(), host);
                }
    
                Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
                for (Instance host : serviceInfo.getHosts()) {
                    newHostMap.put(host.toInetAddr(), host);
                }
    
                Set<Instance> modHosts = new HashSet<Instance>();
                Set<Instance> newHosts = new HashSet<Instance>();
                Set<Instance> remvHosts = new HashSet<Instance>();
    
                List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
                        newHostMap.entrySet());
                for (Map.Entry<String, Instance> entry : newServiceHosts) {
                    Instance host = entry.getValue();
                    String key = entry.getKey();
                    if (oldHostMap.containsKey(key) && !StringUtils
                            .equals(host.toString(), oldHostMap.get(key).toString())) {
                        modHosts.add(host);
                        continue;
                    }
    
                    if (!oldHostMap.containsKey(key)) {
                        newHosts.add(host);
                    }
                }
    
                for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
                    Instance host = entry.getValue();
                    String key = entry.getKey();
                    if (newHostMap.containsKey(key)) {
                        continue;
                    }
    
                    if (!newHostMap.containsKey(key)) {
                        remvHosts.add(host);
                    }
    
                }
    
                if (newHosts.size() > 0) {
                    changed = true;
                    NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                            + JacksonUtils.toJson(newHosts));
                }
    
                if (remvHosts.size() > 0) {
                    changed = true;
                    NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                            + JacksonUtils.toJson(remvHosts));
                }
    
                if (modHosts.size() > 0) {
                    changed = true;
                    updateBeatInfo(modHosts);
                    NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                            + JacksonUtils.toJson(modHosts));
                }
    
                serviceInfo.setJsonFromServer(json);
    
                if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
                    //向事件缓存list changedServices中写入变更事件
                    eventDispatcher.serviceChanged(serviceInfo);
                    //写入缓存
                    DiskCache.write(serviceInfo, cacheDir);
                }
    
            } else {
                changed = true;
                NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(serviceInfo.getHosts()));
                serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
                eventDispatcher.serviceChanged(serviceInfo);
                serviceInfo.setJsonFromServer(json);
                DiskCache.write(serviceInfo, cacheDir);
            }
    
            MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    
            if (changed) {
                NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(serviceInfo.getHosts()));
            }
    
            return serviceInfo;
        }
    nacos配置加刷新流程图
     

  • 相关阅读:
    [LeetCode] 329. Longest Increasing Path in a Matrix
    [LeetCode] 1180. Count Substrings with Only One Distinct Letter
    [LeetCode] 1100. Find K-Length Substrings With No Repeated Characters
    [LeetCode] 312. Burst Balloons
    [LeetCode] 674. Longest Continuous Increasing Subsequence
    [LeetCode] 325. Maximum Size Subarray Sum Equals k
    [LeetCode] 904. Fruit Into Baskets
    [LeetCode] 68. Text Justification
    [LeetCode] 65. Valid Number
    [LeetCode] 785. Is Graph Bipartite?
  • 原文地址:https://www.cnblogs.com/xwzp/p/14140550.html
Copyright © 2011-2022 走看看