zoukankan      html  css  js  c++  java
  • eureka-client拉取注册表

    eureka-client拉取注册表

    全量拉取

    全量抓取注册表,eureka client第一次启动的时候,必须从eureka server端一次性抓取全量的注册表的信息过来,EurekaClient初始化的时候,就会自动全量抓取注册表

            if (clientConfig.shouldFetchRegistry()) {
                try {
                    //全量抓取註冊表
                    boolean primaryFetchRegistryResult = fetchRegistry(false);
                    if (!primaryFetchRegistryResult) {
                        logger.info("Initial registry fetch from primary servers failed");
                    }
                    boolean backupFetchRegistryResult = true;
                    if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                        backupFetchRegistryResult = false;
                        logger.info("Initial registry fetch from backup servers failed");
                    }
                    if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                        throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
                    }
                } catch (Throwable th) {
                    logger.error("Fetch registry error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // Applications是所有的服务,Application中包含了他自己的所有的InstanceInfo,就是一个服务包含了自己的所有的服务实例
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch  //这个是false
                        || (applications == null)  //因为之前抓取过一次了。
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    getAndStoreFullRegistry();
                } else {
                    //增量抓取
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
                        appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // Notify about cache refresh before updating the instance remote status
            onCacheRefreshed();
    
            // Update remote status based on refreshed data held in the cache
            updateInstanceRemoteStatus();
    
            // registry was fetched successfully, so return true
            return true;
        }
    

    下面是获取增量的代码:

        private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            logger.info("Getting all instance registry info from the eureka server");
    
            Applications apps = null;
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
            logger.info("The response status is {}", httpResponse.getStatusCode());
    
            if (apps == null) {
                logger.error("The application is null for some reason. Not storing this information");
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                localRegionApps.set(this.filterAndShuffle(apps));
                logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
            } else {
                logger.warn("Not updating applications as another thread is updating it already");
            }
        }
    

    调用jersey client,发送http请求(http://localhost:8080/v2/apps),GET请求,调用eureka server的getApplications restful接口,获取全量注册表,缓存在自己的本地

        private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            logger.info("Getting all instance registry info from the eureka server");
          // 全量获取注册信息
            Applications apps = null;
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
            logger.info("The response status is {}", httpResponse.getStatusCode());
            
           // 设置到本地缓存
            if (apps == null) {
                logger.error("The application is null for some reason. Not storing this information");
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                localRegionApps.set(this.filterAndShuffle(apps));
                logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
            } else {
                logger.warn("Not updating applications as another thread is updating it already");
            }
        }
    

    AbstractJerseyEurekaHttpClient#getApplicationsInternal

    // AbstractJerseyEurekaHttpClient.java
    @Override
    public EurekaHttpResponse<Applications> getApplications(String... regions) {
       return getApplicationsInternal("apps/", regions);
    }
    
    private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
       ClientResponse response = null;
       String regionsParamValue = null;
       try {
           WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
           if (regions != null && regions.length > 0) {
               regionsParamValue = StringUtil.join(regions);
               webResource = webResource.queryParam("regions", regionsParamValue);
           }
           Builder requestBuilder = webResource.getRequestBuilder();
           addExtraHeaders(requestBuilder);
           response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
    
           Applications applications = null;
           if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
               applications = response.getEntity(Applications.class);
           }
           return anEurekaHttpResponse(response.getStatus(), Applications.class)
                   .headers(headersOf(response))
                   .entity(applications)
                   .build();
       } finally {
           if (logger.isDebugEnabled()) {
               logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                       serviceUrl, urlPath,
                       regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                       response == null ? "N/A" : response.getStatus()
               );
           }
           if (response != null) {
               response.close();
           }
       }
    }
    

    com.netflix.eureka.resources.ApplicationsResource,处理所有应用的请求操作的 Resource ( Controller )。接收全量获取请求,映射 ApplicationsResource#getContainers() 方法,实现代码如下:

     @GET
        public Response getContainers(@PathParam("version") String version,
                                      @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                      @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                      @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                      @Context UriInfo uriInfo,
                                      @Nullable @QueryParam("regions") String regionsStr) {
    
            boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
            String[] regions = null;
            if (!isRemoteRegionRequested) {
                EurekaMonitors.GET_ALL.increment();
            } else {
                regions = regionsStr.toLowerCase().split(",");
                Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
                EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
            }
    
            // Check if the server allows the access to the registry. The server can
            // restrict access if it is not
            // ready to serve traffic depending on various reasons.
            if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
                return Response.status(Status.FORBIDDEN).build();
            }
            CurrentRequestVersion.set(Version.toEnum(version));
            KeyType keyType = Key.KeyType.JSON;
            String returnMediaType = MediaType.APPLICATION_JSON;
            if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
                keyType = Key.KeyType.XML;
                returnMediaType = MediaType.APPLICATION_XML;
            }
    
            Key cacheKey = new Key(Key.EntityType.Application,
                    ResponseCacheImpl.ALL_APPS,
                    keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
            );
    
            Response response;
            if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
                response = Response.ok(responseCache.getGZIP(cacheKey))
                        .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                        .header(HEADER_CONTENT_TYPE, returnMediaType)
                        .build();
            } else {
                response = Response.ok(responseCache.get(cacheKey))
                        .build();
            }
            CurrentRequestVersion.remove();
            return response;
        }
    

    注意:核心的代码在这里 response = Response.ok(responseCache.get(cacheKey)).build();,看上去的话,是从缓存中获取的,那就点进去看看

    image-20211009111936347

    看他有几个变量,是让一些缓存的,然后在有参构造中进行了初始化,那就找一下是在哪里初始化的。是在上下文服务初始化的时候,registry.init(peerEurekaNodes);

        @Override
        public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
            this.numberOfReplicationsLastMin.start();
            this.peerEurekaNodes = peerEurekaNodes;
            initializedResponseCache();
            //初始化的时候,会启动一个定时调度任务,15分钟
            scheduleRenewalThresholdUpdateTask();
            initRemoteRegionRegistry();
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
            }
        }
    
        @Override
        public synchronized void initializedResponseCache() {
            if (responseCache == null) {
                responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
            }
        }
    

    也就是说全量拉取注册表信息的请求,最后会走到缓存中,至于这几个缓存怎么进行处理的,后面在详细写一下。

        public Applications getApplications() {
            boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
            if (disableTransparentFallback) {
                return getApplicationsFromLocalRegionOnly();
            } else {
                return getApplicationsFromAllRemoteRegions();  // Behavior of falling back to remote region can be disabled.
            }
        }
    
    

    增量拉取

    定时任务,每隔30秒来一次,如果本地已经有缓存了,再次抓取的时候就会走增量抓取

      private void getAndUpdateDelta(Applications applications) throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            Applications delta = null;
            EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                delta = httpResponse.getEntity();
            }
    
            if (delta == null) {
                logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                        + "Hence got the full registry.");
                //如果是null的话,就拿全量的注册表
                getAndStoreFullRegistry();
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
                String reconcileHashCode = "";
                if (fetchRegistryUpdateLock.tryLock()) {
                    try {
                        //更新增量的注册表
                        updateDelta(delta);
                        // 计算本地的应用集合一致性哈希码
                        reconcileHashCode = getReconcileHashCode(applications);
                    } finally {
                        fetchRegistryUpdateLock.unlock();
                    }
                } else {
                    logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
                }
                // There is a diff in number of instances for some reason
                if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                    reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
                }
            } else {
                logger.warn("Not updating application delta as another thread is updating it already");
                logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
            }
        }
    

    这块会走EurekaHttpClient的getDelta()方法和接口,http://localhost:8080/v2/apps/delta,get请求

    // AbstractJerseyEurekaHttpClient.java
    @Override
    public EurekaHttpResponse<Applications> getDelta(String... regions) {
       return getApplicationsInternal("apps/delta", regions);
    }
    

    会来到ApplicationsResource#getContainerDifferential,在eureka server端,会走多级缓存的机制,缓存的Key,ALL_APPS_DELTA,不同的就是在那个readWriteCacheMap的从注册表获取数据那里是不一样的,registry.getApplicationDeltasFromMultipleRegions()获取增量的注册表,就是从上一次拉取注册表之后,有变化的注册表

        @Path("delta")
        @GET
        public Response getContainerDifferential(
                @PathParam("version") String version,
                @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
    
            boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
    
            // If the delta flag is disabled in discovery or if the lease expiration
            // has been disabled, redirect clients to get all instances
            if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
                return Response.status(Status.FORBIDDEN).build();
            }
    
            String[] regions = null;
            if (!isRemoteRegionRequested) {
                EurekaMonitors.GET_ALL_DELTA.increment();
            } else {
                regions = regionsStr.toLowerCase().split(",");
                Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
                EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
            }
    
            CurrentRequestVersion.set(Version.toEnum(version));
            KeyType keyType = Key.KeyType.JSON;
            String returnMediaType = MediaType.APPLICATION_JSON;
            if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
                keyType = Key.KeyType.XML;
                returnMediaType = MediaType.APPLICATION_XML;
            }
    
            Key cacheKey = new Key(Key.EntityType.Application,
                    ResponseCacheImpl.ALL_APPS_DELTA,
                    keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
            );
    
            final Response response;
    
            if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
                 response = Response.ok(responseCache.getGZIP(cacheKey))
                        .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                        .header(HEADER_CONTENT_TYPE, returnMediaType)
                        .build();
            } else {
                response = Response.ok(responseCache.get(cacheKey)).build();
            }
    
            CurrentRequestVersion.remove();
            return response;
        }
    
        private Value generatePayload(Key key) {
            Stopwatch tracer = null;
            try {
                String payload;
                switch (key.getEntityType()) {
                    case Application:
                        boolean isRemoteRegionRequested = key.hasRegions();
    
                        if (ALL_APPS.equals(key.getName())) {
                            if (isRemoteRegionRequested) {
                                tracer = serializeAllAppsWithRemoteRegionTimer.start();
                                payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                            } else {
                                tracer = serializeAllAppsTimer.start();
                                payload = getPayLoad(key, registry.getApplications());
                            }
                        } else if (ALL_APPS_DELTA.equals(key.getName())) {
                            if (isRemoteRegionRequested) {
                                tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                                versionDeltaWithRegions.incrementAndGet();
                                versionDeltaWithRegionsLegacy.incrementAndGet();
                                payload = getPayLoad(key,
                                        registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                            } else {
                                tracer = serializeDeltaAppsTimer.start();
                                versionDelta.incrementAndGet();
                                versionDeltaLegacy.incrementAndGet();
                                payload = getPayLoad(key, registry.getApplicationDeltas());
                            }
                        } else {
                            tracer = serializeOneApptimer.start();
                            payload = getPayLoad(key, registry.getApplication(key.getName()));
                        }
                        break;
                    case VIP:
                    case SVIP:
                        tracer = serializeViptimer.start();
                        payload = getPayLoad(key, getApplicationsForVip(key, registry));
                        break;
                    default:
                        logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
                        payload = "";
                        break;
                }
                return new Value(payload);
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
        }
    

    recentlyChangedQueue,代表的含义是,最近有变化的服务实例,比如说,新注册、下线的,或者是别的什么什么,在Registry构造的时候,有一个定时调度的任务,默认是30秒一次,看一下,服务实例的变更记录,是否在队列里停留了超过180s(3分钟),如果超过了3分钟,就会从队列里将这个服务实例变更记录给移除掉。也就是说,这个queue,就保留最近3分钟的服务实例变更记录。delta,增量。

        public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
            if (null == remoteRegions) {
                remoteRegions = allKnownRemoteRegions; // null means all remote regions.
            }
    
            boolean includeRemoteRegion = remoteRegions.length != 0;
    
            if (includeRemoteRegion) {
                GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
            } else {
                GET_ALL_CACHE_MISS_DELTA.increment();
            }
    
            Applications apps = new Applications();
            apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
            Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
            write.lock();
            try {
                //获取最近3分钟变化的注册表
                Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
                logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
                while (iter.hasNext()) {
                    Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                    InstanceInfo instanceInfo = lease.getHolder();
                    logger.debug("The instance id {} is found with status {} and actiontype {}",
                            instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
                    Application app = applicationInstancesMap.get(instanceInfo.getAppName());
                    if (app == null) {
                        app = new Application(instanceInfo.getAppName());
                        applicationInstancesMap.put(instanceInfo.getAppName(), app);
                        apps.addApplication(app);
                    }
                    app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
                }
    
                if (includeRemoteRegion) {
                    for (String remoteRegion : remoteRegions) {
                        RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                        if (null != remoteRegistry) {
                            Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                            if (null != remoteAppsDelta) {
                                for (Application application : remoteAppsDelta.getRegisteredApplications()) {
                                    if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                        Application appInstanceTillNow =
                                                apps.getRegisteredApplications(application.getName());
                                        if (appInstanceTillNow == null) {
                                            appInstanceTillNow = new Application(application.getName());
                                            apps.addApplication(appInstanceTillNow);
                                        }
                                        for (InstanceInfo instanceInfo : application.getInstances()) {
                                            appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
    
                Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
                apps.setAppsHashCode(allApps.getReconcileHashCode());
                return apps;
            } finally {
                write.unlock();
            }
        }
    

    抓取到的delta的注册表,就会跟本地的注册表进行合并,完成服务实例的增删改

        /**
         * Updates the delta information fetches from the eureka server into the
         * local cache.
         *
         * @param delta
         *            the delta information received from eureka server in the last
         *            poll cycle.
         */
        private void updateDelta(Applications delta) {
            int deltaCount = 0;
            for (Application app : delta.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    Applications applications = getApplications();
                    String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                    if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                        Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                        if (null == remoteApps) {
                            remoteApps = new Applications();
                            remoteRegionVsApps.put(instanceRegion, remoteApps);
                        }
                        applications = remoteApps;
                    }
    
                    ++deltaCount;
                    if (ActionType.ADDED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                    } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Modified instance {} to the existing apps ", instance.getId());
    
                        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
    
                    } else if (ActionType.DELETED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp != null) {
                            logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                            existingApp.removeInstance(instance);
                            /*
                             * We find all instance list from application(The status of instance status is not only the status is UP but also other status)
                             * if instance list is empty, we remove the application.
                             */
                            if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                                applications.removeApplication(existingApp);
                            }
                        }
                    }
                }
            }
            logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
    
            getApplications().setVersion(delta.getVersion());
            getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    
            for (Applications applications : remoteRegionVsApps.values()) {
                applications.setVersion(delta.getVersion());
                applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
            }
        }
    

    更新完合并完以后的注册表,会计算一个hash值;delta,带了一个eureka server端的全量注册表的hash值;此时会将eureka client端的合并完的注册表的hash值,跟eureka server端的全量注册表的hash值进行一个比对;如果说不一样的话,说明本地注册表跟server端不一样了,此时就会重新从eureka server拉取全量的注册表到本地来更新到缓存里去

        private String getReconcileHashCode(Applications applications) {
            TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();
            if (isFetchingRemoteRegionRegistries()) {
                for (Applications remoteApp : remoteRegionVsApps.values()) {
                    remoteApp.populateInstanceCountMap(instanceCountMap);
                }
            }
            applications.populateInstanceCountMap(instanceCountMap);
            return Applications.getReconcileHashCode(instanceCountMap);
        }
    
        public static String getReconcileHashCode(Map<String, AtomicInteger> instanceCountMap) {
            StringBuilder reconcileHashCode = new StringBuilder(75);
            for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) {
                reconcileHashCode.append(mapEntry.getKey()).append(STATUS_DELIMITER).append(mapEntry.getValue().get())
                        .append(STATUS_DELIMITER);
            }
            return reconcileHashCode.toString();
        }
    

    下面是我把部分核心代码拿出来打印的一个demo

    public class Test2 {
    
        public static void main(String[] args) {
            TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();
            List<String> list = new ArrayList<>();
            list.add("UP");
            list.add("UP");
            list.add("DOWN");
            for (String info2 : list) {
                AtomicInteger instanceCount = instanceCountMap.computeIfAbsent(info2,
                        k -> new AtomicInteger(0));
                instanceCount.incrementAndGet();
            }
            String reconcileHashCode = getReconcileHashCode(instanceCountMap);
            //DOWN_1_UP_2_
            System.out.println(reconcileHashCode);
        }
    
        public static String getReconcileHashCode(Map<String, AtomicInteger> instanceCountMap) {
            StringBuilder reconcileHashCode = new StringBuilder(75);
            for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) {
                reconcileHashCode.append(mapEntry.getKey()).append("_").append(mapEntry.getValue().get())
                        .append("_");
            }
            return reconcileHashCode.toString();
        }
    }
    
  • 相关阅读:
    Dubbo架构设计及原理详解
    Zookeeper+Dubbo+SpringMVC环境搭建
    Java 延迟队列使用
    深入理解Spring Redis的使用 (九)、通过Redis 实现 分布式锁 的 BUG,以及和数据库加锁的性能测试
    深入理解Spring Redis的使用 (八)、Spring Redis实现 注解 自动缓存
    深入理解Spring Redis的使用 (七)、Spring Redis 使用 jackson序列化 以及 BaseDao代码
    深入理解Spring Redis的使用 (六)、用Spring Aop 实现注解Dao层的自动Spring Redis缓存
    深入理解Spring Redis的使用 (五)、常见问题汇总
    深入理解Spring Redis的使用 (四)、RedisTemplate执行Redis脚本
    深入理解Spring Redis的使用 (三)、使用RedisTemplate的操作类访问Redis
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15385477.html
Copyright © 2011-2022 走看看