zoukankan      html  css  js  c++  java
  • Nacos服务发现

    基础配置初始化

    NacosDiscoveryClientConfiguration
    NacosDiscoveryProperties

    初始化Nacos基础配置信息的bean,主要指yaml中配置Nacos服务相关的信息。

    NacosServiceDiscovery

    初始化获取Nacos服务和实例的bean,通过该bean可以获取服务的信息和实例的信息。

    NacosDiscoveryClientConfiguration

    NacosDiscoveryClient

    初始化NacosDiscoveryClient的bean,本质上就是实现NacosServiceDiscovery的实现。该类的作用就是获取到实例信息和服务信息。

    NacosWatch

    从继承结构上看,NacosWatch主要实现了SmartLifecycle和ApplicationEventPublisherAware,ApplicationEventPublisherAware就是发布事件,这里主要指的就是发布HeartbeatEvent
    事件上报心跳。SmartLifecycle该接口主要是作用是所有的bean都创建完成之后,可以执行自己的初始化工作,或者在退出时执行资源销毁工作。NacosWatch的start方法,主要是完成以下四件事情:
    1. 加入NamingEvent监听;
    2. 获取NamingService;
    3. 订阅NamingService监听事件;
    4. 发布HeartbeatEvent事件;
     public void start() {
            //加入NamingEvent监听
            if (this.running.compareAndSet(false, true)) {
                //更新本地的Instance
                EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
                        event -> new EventListener() {
                            @Override
                            public void onEvent(Event event) {
                                if (event instanceof NamingEvent) {
                                    List<Instance> instances = ((NamingEvent) event)
                                            .getInstances();
                                    Optional<Instance> instanceOptional = selectCurrentInstance(
                                            instances);
                                    instanceOptional.ifPresent(currentInstance -> {
                                        resetIfNeeded(currentInstance);
                                    });
                                }
                            }
                        });
                //获取NamingService
                NamingService namingService = nacosServiceManager
                        .getNamingService(properties.getNacosProperties());
                try {
                    //订阅相关NamingService的事件
                    namingService.subscribe(properties.getService(), properties.getGroup(),
                            Arrays.asList(properties.getClusterName()), eventListener);
                }
                catch (Exception e) {
                    log.error("namingService subscribe failed, properties:{}", properties, e);
                }
                //发布HeartbeatEvent事件
                this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
                        this::nacosServicesWatch, this.properties.getWatchDelay());
            }
        }
    View Code

    NacosWatch的stop方法主要是完成两件事情:

    1. 释放掉监听的线程池资源;
    2. 取消NamingService相关的监听事件;
        @Override
        public void stop() {
            if (this.running.compareAndSet(true, false)) {
                //关闭和释放watch的线程池
                if (this.watchFuture != null) {
                    // shutdown current user-thread,
                    // then the other daemon-threads will terminate automatic.
                    ((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();
                    this.watchFuture.cancel(true);
                }
    
                EventListener eventListener = listenerMap.get(buildKey());
                try {
                    //取消NamingService相关的订阅信息
                    NamingService namingService = nacosServiceManager
                            .getNamingService(properties.getNacosProperties());
                    namingService.unsubscribe(properties.getService(), properties.getGroup(),
                            Arrays.asList(properties.getClusterName()), eventListener);
                }
                catch (NacosException e) {
                    log.error("namingService unsubscribe failed, properties:{}", properties,
                            e);
                }
            }
        }
    View Code

    NacosNamingService

    客户端信息的初始化发生在发起调用的时候,是一种懒加载的方式,并没有在初始化完成的时候就进行,这部分我们分析Ribbon源码的时候我们具体在讲解一下。我们的重点看的是NacosNamingService,从基础配置类中的NacosServiceDiscovery的getInstances的方法调用追踪到更下层我们会发现,与服务端交互的重点的类就是NacosNamingService,NacosNamingService在初始化的时候,主要做了以下10事件

    1. initNamespaceForNaming:用于初始命名空间,在Nacos中命名空间用于租户粗粒度隔离,同时还可以进行环境的区别,如开发环境和测试环境等等;
    2. initSerialization:序列化初始化;
    3. initServerAddr:初始化服务器地址,其中涉及到的endpoint 等;
    4. initWebRootContext:初始化web上下文,其支持通过阿里云EDAS进行部署;
    5. initCacheDir:初始化缓存目录;
    6. initLogName:从配置中获取日志文件;
    7. EventDispatcher:监听事件分发,当客户端订阅了某个服务信息后,会以Listener的方式注册到EventDispatcher的队列中,当有服务变化的时候,会通知订阅者;
    8. NamingProxy:服务端的代理,用于客户端与服务端的通信;
    9. BeatReactor:用于维持与服务器之间的心跳通信,上报客户端注册到服务端的服务信息;
    10. HostReactor:用于客户端服务的订阅,以及从服务端更新服务信息;
    initNamespaceForNaming
    //初始化获取Namespace
    public static String initNamespaceForNaming(Properties properties) {
            String tmpNamespace = null;
            
            //是否使用阿里云上环境进行解析,默认为true,如果没有进行配置,
            //默认使用DEFAULT_USE_CLOUD_NAMESPACE_PARSING
            String isUseCloudNamespaceParsing = properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
                    System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
                            String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING)));
            
            if (Boolean.parseBoolean(isUseCloudNamespaceParsing)) {
                
                tmpNamespace = TenantUtil.getUserTenantForAns();
                //从系统变量获取namespace
                tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
                    @Override
                    public String call() {
                        String namespace = System.getProperty(SystemPropertyKeyConst.ANS_NAMESPACE);
                        LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);
                        return namespace;
                    }
                });
                
                tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
                    @Override
                    public String call() {
                        String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE);
                        LogUtils.NAMING_LOGGER.info("initializer namespace from System Environment :" + namespace);
                        return namespace;
                    }
                });
            }
            //如果不是上云环境,那么从系统变量获取namespace
            tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
                @Override
                public String call() {
                    String namespace = System.getProperty(PropertyKeyConst.NAMESPACE);
                    LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);
                    return namespace;
                }
            });
            //从properties中获取namespace
            if (StringUtils.isEmpty(tmpNamespace) && properties != null) {
                tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
            }
            //获取系统默认的namespace
            tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
                @Override
                public String call() {
                    return UtilAndComs.DEFAULT_NAMESPACE_ID;
                }
            });
            return tmpNamespace;
        }
    View Code
    initServerAddr
    //初始化服务器地址
    private void initServerAddr(Properties properties) {
        //从properties中获取服务器地址
        serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
        //初始化endpoint,如果有endpoint,则废弃serverList
        endpoint = InitUtils.initEndpoint(properties);
        if (StringUtils.isNotEmpty(endpoint)) {
            serverList = "";
        }
    }
    public static String initEndpoint(final Properties properties) {
        if (properties == null) {
    
            return "";
        }
        // Whether to enable domain name resolution rules
        //是否使用endpoint解析,默认为true,也就是:USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE
        String isUseEndpointRuleParsing =
            properties.getProperty(PropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE,
                System.getProperty(SystemPropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE,
                    String.valueOf(ParamUtil.USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE)));
    
        boolean isUseEndpointParsingRule = Boolean.valueOf(isUseEndpointRuleParsing);
        String endpointUrl;
        //使用endpoint解析功能
        if (isUseEndpointParsingRule) {
            // Get the set domain name information
            endpointUrl = ParamUtil.parsingEndpointRule(properties.getProperty(PropertyKeyConst.ENDPOINT));
            if (StringUtils.isBlank(endpointUrl)) {
                return "";
            }
        } else {
            //不使用的化,直接通过properties文件来获取
            endpointUrl = properties.getProperty(PropertyKeyConst.ENDPOINT);
        }
    
        if (StringUtils.isBlank(endpointUrl)) {
            return "";
        }
    
        //获取endpoint的端口
        String endpointPort = TemplateUtils.stringEmptyAndThenExecute(System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_ENDPOINT_PORT), new Callable<String>() {
            @Override
            public String call() {
    
                return properties.getProperty(PropertyKeyConst.ENDPOINT_PORT);
            }
        });
    
        endpointPort = TemplateUtils.stringEmptyAndThenExecute(endpointPort, new Callable<String>() {
            @Override
            public String call() {
                return "8080";
            }
        });
    
        return endpointUrl + ":" + endpointPort;
    }
    View Code
    initWebRootContext
    //阿里云EDAS相关的
    public static void initWebRootContext() {
            // support the web context with ali-yun if the app deploy by EDAS
            final String webContext = System.getProperty(SystemPropertyKeyConst.NAMING_WEB_CONTEXT);
            TemplateUtils.stringNotEmptyAndThenExecute(webContext, new Runnable() {
                @Override
                public void run() {
                    UtilAndComs.webContext = webContext.indexOf("/") > -1 ? webContext : "/" + webContext;
                    
                    UtilAndComs.nacosUrlBase = UtilAndComs.webContext + "/v1/ns";
                    UtilAndComs.nacosUrlInstance = UtilAndComs.nacosUrlBase + "/instance";
                }
            });
        }
    View Code
    initCacheDir
       //初始化缓存目录,用于存放从服务端获取的服务信息,如果客户端与服务端断开了连接,将会使用缓存的信息 
       private void initCacheDir() {
            cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");
            if (StringUtils.isEmpty(cacheDir)) {
                cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
            }
        }
    View Code
    initLogName
    //初始化日志存放路径    
    private void initLogName(Properties properties) {
            logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
            if (StringUtils.isEmpty(logName)) {
    
                if (properties != null && StringUtils
                        .isNotEmpty(properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME))) {
                    logName = properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
                } else {
                    logName = "naming.log";
                }
            }
      }
    View Code
    EventDispatcher

    EventDispatcher维护一个阻塞队列,主要存储发生改变的服务的信息,维护了一个对于服务的监听队列的映射的Map,实时的将服务变化信息同步给监听者,这样客户端就可以通过注册监听者实现在服务变化后动态进行操作。

    public class EventDispatcher implements Closeable {
        
        private ExecutorService executor = null;
        
        //发生了变化的服务队列
        private final BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>();
        //监听者维护映射
        private final ConcurrentMap<String, List<EventListener>> observerMap = new ConcurrentHashMap<String, List<EventListener>>();
        
        private volatile boolean closed = false;
        
        public EventDispatcher() {
            
            this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
                    thread.setDaemon(true);
                    
                    return thread;
                }
            });
            
            this.executor.execute(new Notifier());
        }
        
        /**
         * Add listener.
         *
         * @param serviceInfo service info
         * @param clusters    clusters
         * @param listener    listener
         */
        public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
            
            NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
            List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
            observers.add(listener);
            
            observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
            if (observers != null) {
                observers.add(listener);
            }
            
            serviceChanged(serviceInfo);
        }
        
        /**
         * Remove listener.
         *
         * @param serviceName service name
         * @param clusters    clusters
         * @param listener    listener
         */
        public void removeListener(String serviceName, String clusters, EventListener listener) {
            
            NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map");
            
            List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));
            if (observers != null) {
                Iterator<EventListener> iter = observers.iterator();
                while (iter.hasNext()) {
                    EventListener oldListener = iter.next();
                    if (oldListener.equals(listener)) {
                        iter.remove();
                    }
                }
                if (observers.isEmpty()) {
                    observerMap.remove(ServiceInfo.getKey(serviceName, clusters));
                }
            }
        }
        
        public boolean isSubscribed(String serviceName, String clusters) {
            return observerMap.containsKey(ServiceInfo.getKey(serviceName, clusters));
        }
        
        public List<ServiceInfo> getSubscribeServices() {
            List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();
            for (String key : observerMap.keySet()) {
                serviceInfos.add(ServiceInfo.fromKey(key));
            }
            return serviceInfos;
        }
        
        /**
         * Service changed.
         *
         * @param serviceInfo service info
         */
        public void serviceChanged(ServiceInfo serviceInfo) {
            if (serviceInfo == null) {
                return;
            }
            
            changedServices.add(serviceInfo);
        }
        
        @Override
        public void shutdown() throws NacosException {
            String className = this.getClass().getName();
            NAMING_LOGGER.info("{} do shutdown begin", className);
            ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER);
            closed = true;
            NAMING_LOGGER.info("{} do shutdown stop", className);
        }
        //服务变化通知线程
        private class Notifier implements Runnable {
            
            @Override
            public void run() {
                while (!closed) {
                    
                    ServiceInfo serviceInfo = null;
                    try {
                        //从队列取出变化消息
                        serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
                    } catch (Exception ignore) {
                    }
                    
                    if (serviceInfo == null) {
                        continue;
                    }
                    
                    try {
                        //获取监听者队列
                        List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
                        //遍历监听者队列,调用其onEvent方法
                        if (!CollectionUtils.isEmpty(listeners)) {
                            for (EventListener listener : listeners) {
                                List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                                listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                                        serviceInfo.getClusters(), hosts));
                            }
                        }
                        
                    } catch (Exception e) {
                        NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: "
                                + serviceInfo.getClusters(), e);
                    }
                }
            }
        }
    }
    View Code
    NamingProxy

    NamingProxy封装了与服务端的操作,代码相对比较简单,值得注意的是如果配置安全相关的内容,在初始化的时候该处会进行一个定时任务的查询,如果对安全要求比较高,可重SecurityProxy部分内容,当然服务端部分也需要重写,大部分的情况这注册中心一般暴露在内网环境下,基本上不需要重写的。

    BeatReactor

    BeatReactor负责将客户端的信息上报和下线,对于非持久化的内容采用周期上报内容,这部分在服务心跳的时候我们讲解过,这里不进行源码分析,大家重点关注addBeatInfo、removeBeatInfo和BeatTask的内容,相对比较简单。

    HostReactor

    HostReactor主要负责客户端获取服务端注册的信息的部分,主要分为三个部分:

    1. 客户端需要调用NacosNamingService获取服务信息方法的时候,HostReactor负责把服务信息维护本地缓存的serviceInfoMap中,并且通过UpdateTask定时更新已存在的服务;
    2. HostReactor内部维护PushReceiver对象,负责接收服务端通过UDP协议推送过来的服务变更的信息,并更新到本地缓存serviceInfoMap当中;
    3. HostReactor内部维护FailoverReactor对象,负责当服务端不可用的时候,切换到本地文件缓存模式,从本地文件的缓存中获取服务信息; 
    public class HostReactor implements Closeable {
        
        private static final long DEFAULT_DELAY = 1000L;
        
        private static final long UPDATE_HOLD_INTERVAL = 5000L;
        
        private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
        
        private final Map<String, ServiceInfo> serviceInfoMap;
        
        private final Map<String, Object> updatingMap;
        //接收UDP服务端UDP协议
        private final PushReceiver pushReceiver;
        //阻塞队列的变更消息处理
        private final EventDispatcher eventDispatcher;
        //心跳上报
        private final BeatReactor beatReactor;
        //HTTP请求消息的处理
        private final NamingProxy serverProxy;
        //服务不可用时本地降级文件的处理模式
        private final FailoverReactor failoverReactor;
        
        private final String cacheDir;
        //定时更新服务消息的定时任务
        private final ScheduledExecutorService executor;
        
        public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,
                String cacheDir) {
            this(eventDispatcher, serverProxy, beatReactor, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
        }
        
        public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,
                String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {
            // init executorService
            this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.client.naming.updater");
                    return thread;
                }
            });
            this.eventDispatcher = eventDispatcher;
            this.beatReactor = beatReactor;
            this.serverProxy = serverProxy;
            this.cacheDir = cacheDir;
            if (loadCacheAtStart) {
                this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
            } else {
                this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
            }
            
            this.updatingMap = new ConcurrentHashMap<String, Object>();
            this.failoverReactor = new FailoverReactor(this, cacheDir);
            this.pushReceiver = new PushReceiver(this);
        }
        
        public Map<String, ServiceInfo> getServiceInfoMap() {
            return serviceInfoMap;
        }
        
        public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
            return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
        }
        
        /**
         * Process service json.
         *
         * @param json service json
         * @return service info
         */
        //处理从服务端接收到的数据
        public ServiceInfo processServiceJson(String json) {
            ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
            ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
            if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
                //empty or error push, just ignore
                return oldService;
            }
            
            boolean changed = false;
            //新老信息对比处理
            if (oldService != null) {
                //如果本地旧服务的获取时间比服务器端获取的时间新,则保留本地旧服务的时间
                if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                    NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                            + serviceInfo.getLastRefTime());
                }
                //用新服务信息替换serviceInfoMap
                serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
                
                Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
                for (Instance host : oldService.getHosts()) {
                    oldHostMap.put(host.toInetAddr(), host);
                }
                
                Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
                for (Instance host : serviceInfo.getHosts()) {
                    newHostMap.put(host.toInetAddr(), host);
                }
                
                Set<Instance> modHosts = new HashSet<Instance>();
                Set<Instance> newHosts = new HashSet<Instance>();
                Set<Instance> remvHosts = new HashSet<Instance>();
                
                List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
                        newHostMap.entrySet());
                for (Map.Entry<String, Instance> entry : newServiceHosts) {
                    Instance host = entry.getValue();
                    String key = entry.getKey();
                    if (oldHostMap.containsKey(key) && !StringUtils
                            .equals(host.toString(), oldHostMap.get(key).toString())) {
                        modHosts.add(host);
                        continue;
                    }
                    
                    if (!oldHostMap.containsKey(key)) {
                        newHosts.add(host);
                    }
                }
                
                for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
                    Instance host = entry.getValue();
                    String key = entry.getKey();
                    if (newHostMap.containsKey(key)) {
                        continue;
                    }
                    
                    if (!newHostMap.containsKey(key)) {
                        remvHosts.add(host);
                    }
                    
                }
                
                if (newHosts.size() > 0) {
                    changed = true;
                    NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                            + JacksonUtils.toJson(newHosts));
                }
                
                if (remvHosts.size() > 0) {
                    changed = true;
                    NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                            + JacksonUtils.toJson(remvHosts));
                }
                
                if (modHosts.size() > 0) {
                    changed = true;
                    updateBeatInfo(modHosts);
                    NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                            + JacksonUtils.toJson(modHosts));
                }
                
                serviceInfo.setJsonFromServer(json);
                
                if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
                    //服务信息变更写入阻塞队列
                    eventDispatcher.serviceChanged(serviceInfo);
                    //磁盘缓存希尔
                    DiskCache.write(serviceInfo, cacheDir);
                }
                
            } else {
                //新服务的信息直接加入本地缓存
                changed = true;
                NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(serviceInfo.getHosts()));
                serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
                eventDispatcher.serviceChanged(serviceInfo);
                serviceInfo.setJsonFromServer(json);
                DiskCache.write(serviceInfo, cacheDir);
            }
            //上报数量
            MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
            
            if (changed) {
                NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(serviceInfo.getHosts()));
            }
            
            return serviceInfo;
        }
        
        private void updateBeatInfo(Set<Instance> modHosts) {
            for (Instance instance : modHosts) {
                String key = beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort());
                if (beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) {
                    BeatInfo beatInfo = beatReactor.buildBeatInfo(instance);
                    beatReactor.addBeatInfo(instance.getServiceName(), beatInfo);
                }
            }
        }
        
        //通过key获取服务对象
        private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
            //得到ServiceInfo的key
            String key = ServiceInfo.getKey(serviceName, clusters);
            //从本地缓存中获取服务信息
            return serviceInfoMap.get(key);
        }
        //从服务器端获取Service信息,并解析为ServiceInfo对象
        public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters)
                throws NacosException {
            String result = serverProxy.queryList(serviceName, clusters, 0, false);
            if (StringUtils.isNotEmpty(result)) {
                return JacksonUtils.toObj(result, ServiceInfo.class);
            }
            return null;
        }
        
        public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
            
            NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
           
            String key = ServiceInfo.getKey(serviceName, clusters);
            //是否开启本地文件缓存模式
            if (failoverReactor.isFailoverSwitch()) {
                return failoverReactor.getService(key);
            }
            
            ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
            //从serviceInfoMap获取serviceObj,如果没有serviceObj,则新生成一个
            if (null == serviceObj) {
                serviceObj = new ServiceInfo(serviceName, clusters);
                
                serviceInfoMap.put(serviceObj.getKey(), serviceObj);
                
                updatingMap.put(serviceName, new Object());
                updateServiceNow(serviceName, clusters);
                updatingMap.remove(serviceName);
                
            } else if (updatingMap.containsKey(serviceName)) {
                //如果更新列表中包含服务,则等待更新结束
                if (UPDATE_HOLD_INTERVAL > 0) {
                    // hold a moment waiting for update finish
                    synchronized (serviceObj) {
                        try {
                            serviceObj.wait(UPDATE_HOLD_INTERVAL);
                        } catch (InterruptedException e) {
                            NAMING_LOGGER
                                    .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                        }
                    }
                }
            }
            //添加更新调度任务
            scheduleUpdateIfAbsent(serviceName, clusters);
            
            return serviceInfoMap.get(serviceObj.getKey());
        }
        //从服务端更新服务
        private void updateServiceNow(String serviceName, String clusters) {
            try {
                updateService(serviceName, clusters);
            } catch (NacosException e) {
                NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
            }
        }
        
        /**
         * Schedule update if absent.
         *
         * @param serviceName service name
         * @param clusters    clusters
         */
        //添加更新调度任务
        public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
            
            synchronized (futureMap) {
                if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                    return;
                }
                
                ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
                futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
            }
        }
        
        /**
         * Update service now.
         *
         * @param serviceName service name
         * @param clusters    clusters
         */
        public void updateService(String serviceName, String clusters) throws NacosException {
            ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
            try {
                
                String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
                
                if (StringUtils.isNotEmpty(result)) {
                    processServiceJson(result);
                }
            } finally {
                if (oldService != null) {
                    synchronized (oldService) {
                        oldService.notifyAll();
                    }
                }
            }
        }
        
        /**
         * Refresh only.
         *
         * @param serviceName service name
         * @param clusters    cluster
         */
        public void refreshOnly(String serviceName, String clusters) {
            try {
                serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
            }
        }
        
        @Override
        public void shutdown() throws NacosException {
            String className = this.getClass().getName();
            NAMING_LOGGER.info("{} do shutdown begin", className);
            ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER);
            pushReceiver.shutdown();
            failoverReactor.shutdown();
            NAMING_LOGGER.info("{} do shutdown stop", className);
        }
        
        //定时更新已存在的服务
        public class UpdateTask implements Runnable {
            
            long lastRefTime = Long.MAX_VALUE;
            
            private final String clusters;
            
            private final String serviceName;
            
            /**
             * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
             */
            private int failCount = 0;
            
            public UpdateTask(String serviceName, String clusters) {
                this.serviceName = serviceName;
                this.clusters = clusters;
            }
            
            private void incFailCount() {
                int limit = 6;
                if (failCount == limit) {
                    return;
                }
                failCount++;
            }
            
            private void resetFailCount() {
                failCount = 0;
            }
            
            @Override
            public void run() {
                long delayTime = DEFAULT_DELAY;
                
                try {
                    ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                    
                    if (serviceObj == null) {
                        updateService(serviceName, clusters);
                        return;
                    }
                    
                    if (serviceObj.getLastRefTime() <= lastRefTime) {
                        updateService(serviceName, clusters);
                        serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                    } else {
                        // if serviceName already updated by push, we should not override it
                        // since the push data may be different from pull through force push
                        refreshOnly(serviceName, clusters);
                    }
                    
                    lastRefTime = serviceObj.getLastRefTime();
                    
                    if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
                            .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                        // abort the update task
                        NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                        return;
                    }
                    if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                        incFailCount();
                        return;
                    }
                    delayTime = serviceObj.getCacheMillis();
                    resetFailCount();
                } catch (Throwable e) {
                    incFailCount();
                    NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
                } finally {
                    executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
                }
            }
        }
    }
    View Code
    总结

    从NacosNamingService初始化的整个过程中,很重要的一点值得我们学习,就是单一职责这个理念在每个类的设计上表现的淋淋尽致。

    结束

    欢迎大家点点关注,点点赞,上海地区帮助团队招两个Java,其中两点比较重要:靠谱和全日制本科,正常的业务开发,整体技术栈Dubbo+GateWay网关,有意者私聊!
  • 相关阅读:
    解决pgAdmin4启动失败方法
    X86汇编——计算斐波那契数列程序(详细注释和流程图说明)
    unity3d学习笔记(一) 第一人称视角实现和倒计时实现
    项目element-ui checkbox里面获取选中项 实现批量删除 修改
    //统计报表-供水量统计主列表分页查询 Element-ui的分页插件
    导出excel的功能效果实现
    echarts的基本使用以及如何使用官方实例的方法
    vue.js移动端app:初始配置
    使用雪碧图
    iconFont字体图标
  • 原文地址:https://www.cnblogs.com/wtzbk/p/14806330.html
Copyright © 2011-2022 走看看