zoukankan      html  css  js  c++  java
  • Nacos 服务消费原理

      继 Nacos服务注册原理 后,我们来看一下Nacos 是怎么实现服务的消费的。

      服务注册成功之后,消费者就可以从nacos server中获取到服务提供者的地址,然后进行服务的调用。在服务消费中,有一个核心的类 NacosDiscoveryClient 来负责和nacos交互,去获得服务提供者的地址信息。基于org.springframework.cloud.client.discovery.DiscoveryClient 的实现,如下图所示,Consul、Eureka是我们所熟悉的。他们所实现的是同一套规范。

      NacosDiscoveryClient 中提供了一个 getInstances 方法用来根据服务提供者名称获取服务提供者的url地址的方法.

    客户端启动获取服务列表:

      我们可以通过Debug 模式来验证这一猜想,启动服务消费者一定会进入NacosDiscoveryClient 的 getInstances 方法。

    @Override
    public List<ServiceInstance> getInstances(String serviceId) {
            try {
                return serviceDiscovery.getInstances(serviceId);
            }
            catch (Exception e) {
                throw new RuntimeException(
                        "Can not get hosts from nacos server. serviceId: " + serviceId, e);
            }
    }

      然后回调用 NacosServiceDiscovery 的 getInstances 方法,讲我们所配置的  group 、serviceId 传过去,获取基于该serviceId的实例列表。

      调用NamingService,根据serviceId、group获得服务实例列表。然后把instance转化为ServiceInstance对象

    public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
            String group = discoveryProperties.getGroup();
            List<Instance> instances = discoveryProperties.namingServiceInstance()
                    .selectInstances(serviceId, group, true);
            return hostToServiceInstanceList(instances, serviceId);
    }
    
    @ConfigurationProperties("spring.cloud.nacos.discovery")
    public class NacosDiscoveryProperties {
        //...
    }

      NacosNamingService.selectInstances 首先从 hostReactor 获取 serviceInfo,然后再从serviceInfo.getHosts()剔除非 healty、非enabled、weight小于等于0的 instance 再返回;如果subscribe为true,则执行 hostReactor.getServiceInfo获取serviceInfo,否则执行

    hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo

    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
            ServiceInfo serviceInfo;
            if (subscribe) {//是否订阅服务地址的变化,默认为true
                serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
            } else {
                serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
            }
            return selectInstances(serviceInfo, healthy);
    }
    
    private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
    
        Iterator<Instance> iterator = list.iterator();
        while (iterator.hasNext()) {
            Instance instance = iterator.next();
            if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
                iterator.remove();
            }
        }
        return list;
    }

      从 hostReactor 获取 serviceInfo的具体操作如下:

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    
            NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
            //拼接服务名称+集群名称(默认为空)
            String key = ServiceInfo.getKey(serviceName, clusters);
            if (failoverReactor.isFailoverSwitch()) {
                return failoverReactor.getService(key);
            }
            //从ServiceInfoMap中根据key来查找服务提供者列表,ServiceInfoMap是客户端的服务地址的本地缓存
            ServiceInfo serviceObj = getSerivceInfo0(serviceName, clusters);
            //如果为空,表示本地缓存不存在
            if (null == serviceObj) {
                serviceObj = new ServiceInfo(serviceName, clusters);
                //如果找不到则创建一个新的然后放入serviceInfoMap,同时放入updatingMap,执行updateServiceNow,再从updatingMap移除;
                serviceInfoMap.put(serviceObj.getKey(), serviceObj);
    
                updatingMap.put(serviceName, new Object());
                // 立马从Nacos server中去加载服务地址信息
                updateServiceNow(serviceName, clusters);
                updatingMap.remove(serviceName);
    
            } else if (updatingMap.containsKey(serviceName)) {
                //如果从serviceInfoMap找出来的serviceObj在updatingMap中则等待UPDATE_HOLD_INTERVAL
                if (updateHoldInterval > 0) {
                    // hold a moment waiting for update finish
                    synchronized (serviceObj) {
                        try {
                            serviceObj.wait(updateHoldInterval);
                        } catch (InterruptedException e) {
                            NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                        }
                    }
                }
            }
            // 开启定时调度,每10s去查询一次服务地址
            //如果本地缓存中存在,则通过scheduleUpdateIfAbsent开启定时任务,再从serviceInfoMap取出serviceInfo
            scheduleUpdateIfAbsent(serviceName, clusters);
            return serviceInfoMap.get(serviceObj.getKey());
    }

      其中获取服务实例列表信息的方法为  updateServiceNow

    public void updateServiceNow(String serviceName, String clusters) {
            ServiceInfo oldService = getSerivceInfo0(serviceName, clusters);
            try {
    
                String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
                if (StringUtils.isNotEmpty(result)) {
                    processServiceJSON(result);
                }
        // .......
    }

      可以发现这里请求列表的时候发送了一个 pushReceiver.getUDPPort() ,这就是我们在服务注册的时候提到的,Nacos Server在检测到心跳超时的时候回主动发起一下UDP请求向客户端发送服务注册信息。哪个UDP端口就是这里传输给NacosServer的。

      可以看到 queryList

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
            throws NacosException {
           // 组装请求参数
            final Map<String, String> params = new HashMap<String, String>(8);
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, serviceName);
            params.put("clusters", clusters);
            params.put("udpPort", String.valueOf(udpPort));
            params.put("clientIP", NetUtils.localIP());
            params.put("healthyOnly", String.valueOf(healthyOnly));
            //通过HttpClient 发送请求
            return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
        }

    Nacos Server 处理消费端请求:

      通过上面消费端的请求 URL,我们可以定位到服务端源码的 InstanceController 的对应 GET请求的列表获取接口:

    @GetMapping("/list")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
    public ObjectNode list(HttpServletRequest request) throws Exception {
            
            String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
            //从 request中获取请求参数
            String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            String agent = WebUtils.getUserAgent(request);
            String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
            String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
            int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
            String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
            boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
            
            String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
            
            String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
            
            boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
            //传入请求参数,通过这些请求参数定位到服务实例列表
            return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                    healthyOnly);
    }

      就跟查询数据库一样,现在有参数了,接下去就是重头戏了:

    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
                int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
            // 创建一个客户端的信息
            ClientInfo clientInfo = new ClientInfo(agent);
    // 准备返回结果类型 ObjectNode result
    = JacksonUtils.createEmptyJsonNode();
    // 从缓存的 serviceMap中获取相应的服务实例 Service service
    = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // now try to enable the push try {//这里判断udp端口跟是否开启推送机制 if (udpPort > 0 && pushService.canEnablePush(agent)) { //这里就很熟悉了,将构建一个InetSocketAddress,将Nacos Server 作为客户端,请求消费端进行推送 pushService .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e); cacheMillis = switchDomain.getDefaultCacheMillis(); } if (service == null) {//如果获取到的服务为空,组装结果返回 if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } result.put("name", serviceName); result.put("clusters", clusters); result.put("cacheMillis", cacheMillis);
    // 返回空的 hosts result.replace(
    "hosts", JacksonUtils.createEmptyArrayNode()); return result; } //检查服务是否可用 checkIfDisabled(service); //准备返回的实例列表 List<Instance> srvedIPs; // 通过传进来的 clusters 获取服务ips srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ","))); // filter ips using selector: if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) { srvedIPs = service.getSelector().select(clientIP, srvedIPs); } //很显然,这里获取的sevedIPs为空,因为我们clusters是空的 if (CollectionUtils.isEmpty(srvedIPs)) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } //判断消费端类型及版本 if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } //还是组装信息返回,这里返回的还是空的,加上服务的元数据 result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.set("hosts", JacksonUtils.createEmptyArrayNode()); result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result; } //两个List是分别放置健康/非健康实例 Map<Boolean, List<Instance>> ipMap = new HashMap<>(2); ipMap.put(Boolean.TRUE, new ArrayList<>()); ipMap.put(Boolean.FALSE, new ArrayList<>()); //筛选健康实例 for (Instance ip : srvedIPs) { ipMap.get(ip.isHealthy()).add(ip); } if (isCheck) { result.put("reachProtectThreshold", false); } //这个类似于Eureka的自我保护机制。避免网络延迟带来的心跳超时的实例剔除 double threshold = service.getProtectThreshold(); if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) { Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName); if (isCheck) { result.put("reachProtectThreshold", true); } ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE)); ipMap.get(Boolean.FALSE).clear(); } if (isCheck) { result.put("protectThreshold", service.getProtectThreshold()); result.put("reachLocalSiteCallThreshold", false); return JacksonUtils.createEmptyJsonNode(); } ArrayNode hosts = JacksonUtils.createEmptyArrayNode(); // 遍历map,组装数据返回给消费者 for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) { List<Instance> ips = entry.getValue(); if (healthyOnly && !entry.getKey()) { continue; } for (Instance instance : ips) { // remove disabled instance: if (!instance.isEnabled()) { continue; } ObjectNode ipObj = JacksonUtils.createEmptyJsonNode(); ipObj.put("ip", instance.getIp()); ipObj.put("port", instance.getPort()); // deprecated since nacos 1.0.0: ipObj.put("valid", entry.getKey()); ipObj.put("healthy", entry.getKey()); ipObj.put("marked", instance.isMarked()); ipObj.put("instanceId", instance.getInstanceId()); ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata())); ipObj.put("enabled", instance.isEnabled()); ipObj.put("weight", instance.getWeight()); ipObj.put("clusterName", instance.getClusterName()); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { ipObj.put("serviceName", instance.getServiceName()); } else { ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName())); } ipObj.put("ephemeral", instance.isEphemeral()); hosts.add(ipObj); } } result.replace("hosts", hosts); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result; }

      经过这么一系列操作以后,服务消费者就能获取到相应的服务实例集合了。

    服务动态更新:

      基于上面的分析,服务消费者对于服务实例的动态更新主要来源于两个地方,第一个就是本地的定时任务,第二个就是采用服务端的 Push 机制,如下图。

      pull 定时任务请求更新服务信息:

      在查询服务调用 getServiceInfo 方法的代码中,会开启一个定时任务,这个任务会在默认在1s之后开始执行。而任务的具体实现是一个UpdateTask。

    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
    
            synchronized (futureMap) {
                if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                    return;
                }
    
                ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
                futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
            }
    }

      所以我们定位到 UpdateTask 的 run 方法:

    @Override
    public void run() {
                try {//查询本地缓存
                    ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                    //如果本地缓存为空,则向服务器发起更新请求
                    if (serviceObj == null) {
                        updateServiceNow(serviceName, clusters);
                        // 开启一个任务,延后一秒执行一次
                        executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                        return;
                    }
                    //判断服务是否已过期
                    if (serviceObj.getLastRefTime() <= lastRefTime) {
                        updateServiceNow(serviceName, clusters);
                        serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                    } else {
                        // if serviceName already updated by push, we should not override it
                        // since the push data may be different from pull through force push
                 //如果服务已经被基于push机制的情况下做了更新,那么我们不需要覆盖本地服务。
                          //因为push过来的数据和pull数据不同,所以这里只是调用请求去刷新服务
                        refreshOnly(serviceName, clusters);
                    }
                    //延后10s执行
                    executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
                    //更新最后一次刷新时间
                    lastRefTime = serviceObj.getLastRefTime();
                } catch (Throwable e) {
                    NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
                }
    }

      push请求推送数据:

      还记得在服务提供者发起服务注册时。在 createEmptyService 方法中,会创建一个空的服务.并且在这个创建过程中,调用了一个 putServiceAndInit ,这个方法中除了创建空的服务并且初始化,还会调用 service.init 方法进行服务的初始化。

    private void putServiceAndInit(Service service) throws NacosException {
            putService(service);
            service.init();
            consistencyService
                    .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
            consistencyService
                    .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
            Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }
    
    /**
    * Init service.
    */
    public void init() {
            HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
            for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
                entry.getValue().setService(this);
                entry.getValue().init();
            }
    }

      这个init方法,会和当前服务提供者建立一个心跳检测机制,这个心跳检测会每5s执行一次。然后来看 ClientBeatCheckTask.run

    @Override
    public void run() {
            try {
                if (!getDistroMapper().responsible(service.getName())) {
                    return;
                }
                
                if (!getSwitchDomain().isHealthCheckEnabled()) {
                    return;
                }
                //获取到所有服务实例
                List<Instance> instances = service.allIPs(true);
                
                // first set health status of instances:
                //遍历服务节点进行心跳检测
    
                for (Instance instance : instances) {
                    //如果服务实例的最后一次心跳时间大于设置的超时时间,则认为这个服务已经下线。
                    if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                        if (!instance.isMarked()) {
                            if (instance.isHealthy()) {
                                instance.setHealthy(false);
                                Loggers.EVT_LOG
                                        .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                                instance.getIp(), instance.getPort(), instance.getClusterName(),
                                                service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                                instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                                getPushService().serviceChanged(service);//推送服务变更事
                                //发布实例心跳超时事件
                                ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                            }
                        }
                    }
                }
                
                if (!getGlobalConfig().isExpireInstance()) {
                    return;
                }
                
                // then remove obsolete instances:
                for (Instance instance : instances) {
                    
                    if (instance.isMarked()) {
                        continue;
                    }
                    
                    if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                        // delete instance
                        Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                                JacksonUtils.toJson(instance));
                        deleteIp(instance);//删除过期的服务实例
                    }
                }
                
            } catch (Exception e) {
                Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
            }
    }

      在这里 getPushService().serviceChanged(service) 会发布一个服务变更事件:

    public void serviceChanged(Service service) {
            // merge some change events to reduce the push frequency:
            if (futureMap
                    .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
                return;
            }
            
            this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
    }

      而 PushService 类实现了 ApplicationListener<ServiceChangeEvent> 所以本身又会取监听该事件,监听服务状态变更事件,然后遍历所有的客户端,通过udp协议进行消息的广播通知:

    @Override
    public void onApplicationEvent(ServiceChangeEvent event) {
            Service service = event.getService();//获取到服务
            String serviceName = service.getName();//服务名
            String namespaceId = service.getNamespaceId();//命名空间
            //执行任务
            Future future = GlobalExecutor.scheduleUdpSender(() -> {
                try {
                    Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                    ConcurrentMap<String, PushClient> clients = clientMap
                            .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                    if (MapUtils.isEmpty(clients)) {
                        return;
                    }
                    
                    Map<String, Object> cache = new HashMap<>(16);
                    long lastRefTime = System.nanoTime();
                    for (PushClient client : clients.values()) {
                        if (client.zombie()) {
                            Loggers.PUSH.debug("client is zombie: " + client.toString());
                            clients.remove(client.toString());
                            Loggers.PUSH.debug("client is zombie: " + client.toString());
                            continue;
                        }
                        
                        Receiver.AckEntry ackEntry;
                        Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                        String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                        byte[] compressData = null;
                        Map<String, Object> data = null;
                        if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                            org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                            compressData = (byte[]) (pair.getValue0());
                            data = (Map<String, Object>) pair.getValue1();
                            
                            Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                        }
                        
                        if (compressData != null) {
                            ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                        } else {
                            ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                            if (ackEntry != null) {
                                cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                            }
                        }
                        
                        Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                                client.getServiceName(), client.getAddrStr(), client.getAgent(),
                                (ackEntry == null ? null : ackEntry.key));
                        //执行 UDP  推送
                        udpPush(ackEntry);
                    }
                } catch (Exception e) {
                    Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
                    
                } finally {
                    futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                }
                
            }, 1000, TimeUnit.MILLISECONDS);
            
            futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
            
     }

      那么服务消费者此时应该是建立了一个udp服务的监听,否则服务端无法进行数据的推送。这个监听是在HostReactor的构造方法中初始化的

    public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
                           boolean loadCacheAtStart, int pollingThreadCount) {
    
            executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.client.naming.updater");
                    return thread;
                }
            });
    
            this.eventDispatcher = eventDispatcher;
            this.serverProxy = serverProxy;
            this.cacheDir = cacheDir;
            if (loadCacheAtStart) {
                this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
            } else {
                this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
            }
    
            this.updatingMap = new ConcurrentHashMap<String, Object>();
            this.failoverReactor = new FailoverReactor(this, cacheDir);
            this.pushReceiver = new PushReceiver(this);
        }

       这里主要看 new PushReceiver(this) 把this 传进去,初始化了一个DatagramSocket,这是一个Udp的socket连接,开启一个线程,定时执行当前任务

    public PushReceiver(HostReactor hostReactor) {
            try {
                this.hostReactor = hostReactor;
                udpSocket = new DatagramSocket();
    
                executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setDaemon(true);
                        thread.setName("com.alibaba.nacos.naming.push.receiver");
                        return thread;
                    }
                });
    
                executorService.execute(this);
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] init udp socket failed", e);
            }
        }

      然后需要关注的是  PushReceiver 的 Run 方法:在run方法中,不断循环监听服务端的push请求。然后调用 processServiceJSON 对服务端的数据进行解析。

    @Override
    public void run() {
            while (true) {
                try {
                    // byte[] is initialized with 0 full filled by default
                    byte[] buffer = new byte[UDP_MSS];
                    DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
    
                    udpSocket.receive(packet);
    
                    String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
                    NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
    
                    PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
                    String ack;
                    if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                        hostReactor.processServiceJSON(pushPacket.data);
    
                        // send ack to server
                        ack = "{"type": "push-ack""
                            + ", "lastRefTime":"" + pushPacket.lastRefTime
                            + "", "data":" + """}";
                    } else if ("dump".equals(pushPacket.type)) {
                        // dump data to server
                        ack = "{"type": "dump-ack""
                            + ", "lastRefTime": "" + pushPacket.lastRefTime
                            + "", "data":" + """
                            + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                            + ""}";
                    } else {
                        // do nothing send ack only
                        ack = "{"type": "unknown-ack""
                            + ", "lastRefTime":"" + pushPacket.lastRefTime
                            + "", "data":" + """}";
                    }
    
                    udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                        ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
                } catch (Exception e) {
                    NAMING_LOGGER.error("[NA] error while receiving push data", e);
                }
            }
        }

       就这样完成服务的动态更新。更加细节的部分请阅读源码实现。

  • 相关阅读:
    编程之美-2.18 数组分割
    话题模型
    暂时跳过的Leetcode题目
    LDA主题模型
    二叉树非递归的统一实现
    取余和取模运算
    IDM非补丁破解方法
    两种建立堆的方法HeapInsert & Heapify
    非阻塞connect:Web客户程序
    非阻塞connect
  • 原文地址:https://www.cnblogs.com/wuzhenzhao/p/13625499.html
Copyright © 2011-2022 走看看