zoukankan      html  css  js  c++  java
  • Eureka获取服务列表源码解析

    在之前的文章:EurekaClient自动装配及启动流程解析中,我们提到了在类DiscoveryClient的构造方法中存在一个刷新线程和从服务端拉取注册信息的操作

    这两个就是eureka获取服务列表的两种情况:

    1. 全量获取:Eureka启动时拉取全部服务
    2. 增量获取:一个定时任务定时获取
    全量获取
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
             fetchRegistryFromBackup();
         }
    

    全量获取使用的fetchRegistry方法,如果使用此方法没有成功获取到的话则会执行fetchRegistryFromBackup方法使用备份方式拉取,备份拉取使用的是BackupRegistry接口的实现类,只不过eureka默认没有实现。

    继续看拉取流程

    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()//禁用部分获取
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch//全部获取
                        || (applications == null)//本地没有任何实例
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    getAndStoreFullRegistry();
                } else {
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
    1. 首先入参forceFullRegistryFetch代表的就是全量获取或者增量获取
    2. 获取本地缓存的这些实例
        private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
    
        public Applications getApplications() {
            return localRegionApps.get();
        }
    

    可以看到所有实例应该缓存在localRegionApps对象中
    3. 然后根据一些条件判断是否应该执行全量获取,也就是就算入参指定增量获取,但是不满足这些条件还是会进行全量获取
    4. 接着是打印当前的实例数量
    5. 最后是更新拉取到的实例的状态

    全量拉取处理
        private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            logger.info("Getting all instance registry info from the eureka server");
    
            Applications apps = null;
            //发起获取
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
            logger.info("The response status is {}", httpResponse.getStatusCode());
    
            if (apps == null) {
                logger.error("The application is null for some reason. Not storing this information");
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            //缓存结果
                localRegionApps.set(this.filterAndShuffle(apps));
                logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
            } else {
                logger.warn("Not updating applications as another thread is updating it already");
            }
        }
    

    其中调用的逻辑比较简单:

    public EurekaHttpResponse<Applications> getApplications(String... regions) {
       return getApplicationsInternal("apps/", regions);
    }
    
    private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
       ClientResponse response = null;
       String regionsParamValue = null;
       try {
           WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
           if (regions != null && regions.length > 0) {
               regionsParamValue = StringUtil.join(regions);
               webResource = webResource.queryParam("regions", regionsParamValue);
           }
           Builder requestBuilder = webResource.getRequestBuilder();
           addExtraHeaders(requestBuilder);
           response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
    
           Applications applications = null;
           if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
               applications = response.getEntity(Applications.class);
           }
           return anEurekaHttpResponse(response.getStatus(), Applications.class)
                   .headers(headersOf(response))
                   .entity(applications)
                   .build();
       } finally {
           if (logger.isDebugEnabled()) {
               logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                       serviceUrl, urlPath,
                       regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                       response == null ? "N/A" : response.getStatus()
               );
           }
           if (response != null) {
               response.close();
           }
       }
    }
    
    全量拉取服务端处理

    全量获取的服务端Controller在类ApplicationsResource

    @GET
        public Response getContainers(@PathParam("version") String version,
                                      @HeaderParam(HEADER_ACCEPT) String acceptHeader,@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,@Context UriInfo uriInfo,@Nullable @QueryParam("regions") String regionsStr) {
    
            boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
            String[] regions = null;
            if (!isRemoteRegionRequested) {
                EurekaMonitors.GET_ALL.increment();
            } else {
                regions = regionsStr.toLowerCase().split(",");
                Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
                EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
            }
    
            // Check if the server allows the access to the registry. The server can
            // restrict access if it is not
            // ready to serve traffic depending on various reasons.
            if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
                return Response.status(Status.FORBIDDEN).build();
            }
            CurrentRequestVersion.set(Version.toEnum(version));
            KeyType keyType = Key.KeyType.JSON;
            String returnMediaType = MediaType.APPLICATION_JSON;
            if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
                keyType = Key.KeyType.XML;
                returnMediaType = MediaType.APPLICATION_XML;
            }
    
            Key cacheKey = new Key(Key.EntityType.Application,
                    ResponseCacheImpl.ALL_APPS,
                    keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
            );
    
            Response response;
            if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
                response = Response.ok(responseCache.getGZIP(cacheKey))
                        .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                        .header(HEADER_CONTENT_TYPE, returnMediaType)
                        .build();
            } else {
                response = Response.ok(responseCache.get(cacheKey))
                        .build();
            }
            return response;
        }
    

    虽然这个Controller很长,但是与返回结果相关的也就这么几行

            Key cacheKey = new Key(Key.EntityType.Application,
                    ResponseCacheImpl.ALL_APPS,
                    keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
            );
    				Response response;
            if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
                response = Response.ok(responseCache.getGZIP(cacheKey))
                        .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                        .header(HEADER_CONTENT_TYPE, returnMediaType)
                        .build();
            } else {
                response = Response.ok(responseCache.get(cacheKey))
                        .build();
            }
    

    这里有两个点,KeyResponseCacheImpl

    Key

    这个对象中包含了缓存键

        public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
            this.regions = regions;
            this.entityType = entityType;
            this.entityName = entityName;
            this.requestType = type;
            this.requestVersion = v;
            this.eurekaAccept = eurekaAccept;
            hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
                    + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
        }
    

    这个hashKey最后的结果就是类似于这样的:ApplicationALL_APPSJSONV2full

    ResponseCacheImpl

    这个对象是响应缓存的实现
    当hashKey创造好之后,responseCache.getGZIP(cacheKey)就是读取缓存并压缩的方法

        public byte[] getGZIP(Key key) {
            Value payload = getValue(key, shouldUseReadOnlyResponseCache);
            if (payload == null) {
                return null;
            }
            return payload.getGzipped();
        }
    

    payload.getGzipped()是压缩的方法就不看了,看getValue

    Value getValue(final Key key, boolean useReadOnlyCache) {
            Value payload = null;
            try {
                if (useReadOnlyCache) {
                    final Value currentPayload = readOnlyCacheMap.get(key);
                    if (currentPayload != null) {
                        payload = currentPayload;
                    } else {
                        payload = readWriteCacheMap.get(key);
                        readOnlyCacheMap.put(key, payload);
                    }
                } else {
                    payload = readWriteCacheMap.get(key);
                }
            } catch (Throwable t) {
                logger.error("Cannot get value for key : {}", key, t);
            }
            return payload;
        }
    

    大致就是先从readOnlyCacheMap只读缓存中获取,如果不存在的话则从readWriteCacheMap读写缓存中获取

    缓存生成

    上面服务端处理请求时是直接从缓存中读取的,那么这个缓存又是在什么时候生成的呢?

    读写缓存

    缓存的生成在ResponseCacheImpl的构造方法中

    this.readWriteCacheMap =
                    CacheBuilder.newBuilder().initialCapacity(1000)
                            .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                            .removalListener(new RemovalListener<Key, Value>() {
                                @Override
                                public void onRemoval(RemovalNotification<Key, Value> notification) {
                                    Key removedKey = notification.getKey();
                                    if (removedKey.hasRegions()) {
                                        Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                        regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                    }
                                }
                            })
                            .build(new CacheLoader<Key, Value>() {
                                @Override
                                public Value load(Key key) throws Exception {
                                    if (key.hasRegions()) {
                                        Key cloneWithNoRegions = key.cloneWithoutRegions();
                                        regionSpecificKeys.put(cloneWithNoRegions, key);
                                    }
                                    Value value = generatePayload(key);
                                    return value;
                                }
                            });
    
    

    可以看到读写缓存的容量是1000,而缓存的生成方法在generatePayload方法中

        private Value generatePayload(Key key) {
            Stopwatch tracer = null;
            try {
                String payload;
                switch (key.getEntityType()) {
                    case Application:
                        boolean isRemoteRegionRequested = key.hasRegions();
    
                        if (ALL_APPS.equals(key.getName())) {
                            if (isRemoteRegionRequested) {
                                tracer = serializeAllAppsWithRemoteRegionTimer.start();
                                payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                            } else {
                                tracer = serializeAllAppsTimer.start();
                                payload = getPayLoad(key, registry.getApplications());
                            }
                        } else if (ALL_APPS_DELTA.equals(key.getName())) {
                            if (isRemoteRegionRequested) {
                                tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                                versionDeltaWithRegions.incrementAndGet();
                                versionDeltaWithRegionsLegacy.incrementAndGet();
                                payload = getPayLoad(key,
                                        registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                            } else {
                                tracer = serializeDeltaAppsTimer.start();
                                versionDelta.incrementAndGet();
                                versionDeltaLegacy.incrementAndGet();
                                payload = getPayLoad(key, registry.getApplicationDeltas());
                            }
                        } else {
                            tracer = serializeOneApptimer.start();
                            payload = getPayLoad(key, registry.getApplication(key.getName()));
                        }
                        break;
                    case VIP:
                    case SVIP:
                        tracer = serializeViptimer.start();
                        payload = getPayLoad(key, getApplicationsForVip(key, registry));
                        break;
                    default:
                        logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
                        payload = "";
                        break;
                }
                return new Value(payload);
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
        }
    

    这个方法的重点在这一句上payload = getPayLoad(key, registry.getApplications());

    getApplications

    public Applications getApplications() {
            boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
            if (disableTransparentFallback) {
                return getApplicationsFromLocalRegionOnly();
            } else {
                return getApplicationsFromAllRemoteRegions();  // Behavior of falling back to remote region can be disabled.
            }
        }
        
    

    这里会进入getApplicationsFromLocalRegionOnly方法

        public Applications getApplicationsFromLocalRegionOnly() {
            return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);
        }
        public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
    
            boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
    
            logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
                    includeRemoteRegion, remoteRegions);
    
            if (includeRemoteRegion) {
                GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
            } else {
                GET_ALL_CACHE_MISS.increment();
            }
            Applications apps = new Applications();
            apps.setVersion(1L);
            for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
                Application app = null;
    
                if (entry.getValue() != null) {
                    for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
                        Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                        if (app == null) {
                            app = new Application(lease.getHolder().getAppName());
                        }
                        app.addInstance(decorateInstanceInfo(lease));
                    }
                }
                if (app != null) {
                    apps.addApplication(app);
                }
            }
            if (includeRemoteRegion) {
                for (String remoteRegion : remoteRegions) {
                    RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                    if (null != remoteRegistry) {
                        Applications remoteApps = remoteRegistry.getApplications();
                        for (Application application : remoteApps.getRegisteredApplications()) {
                            if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                logger.info("Application {}  fetched from the remote region {}",
                                        application.getName(), remoteRegion);
    
                                Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                                if (appInstanceTillNow == null) {
                                    appInstanceTillNow = new Application(application.getName());
                                    apps.addApplication(appInstanceTillNow);
                                }
                                for (InstanceInfo instanceInfo : application.getInstances()) {
                                    appInstanceTillNow.addInstance(instanceInfo);
                                }
                            } else {
                                logger.debug("Application {} not fetched from the remote region {} as there exists a "
                                                + "whitelist and this app is not in the whitelist.",
                                        application.getName(), remoteRegion);
                            }
                        }
                    } else {
                        logger.warn("No remote registry available for the remote region {}", remoteRegion);
                    }
                }
            }
            apps.setAppsHashCode(apps.getReconcileHashCode());
            return apps;
        }
    

    这里获取的时候分为3个部分:

    1. 第一个for循环中,根据当前服务端的租约信息获取所有的实例信息,每个实例信息使用Application对象封装,多个Application使用Applications对象封装
    2. 第二个for循环则是处理如果请求中要获取某个分区的情况
    3. 设置所有实例的hashCode,这个hashCode是用来在增量获取的时候区分返回结果的

    getPayLoad
    这里则仅仅只是一个编码

    private String getPayLoad(Key key, Applications apps) {
       // 获得编码器
       EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
       String result;
       try {
           // 编码
           result = encoderWrapper.encode(apps);
       } catch (Exception e) {
           logger.error("Failed to encode the payload for all apps", e);
           return "";
       }
       if(logger.isDebugEnabled()) {
           logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
       }
       return result;
    }
    
    只读缓存

    只读缓存是定时刷新的,同样也在ResponseCacheImpl的构造方法中

            if (shouldUseReadOnlyResponseCache) {
                timer.schedule(getCacheUpdateTask(),
                        new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                + responseCacheUpdateIntervalMs),
                        responseCacheUpdateIntervalMs);
            }
    

    这个刷新任务是这样的

        private TimerTask getCacheUpdateTask() {
            return new TimerTask() {
                @Override
                public void run() {
                    logger.debug("Updating the client cache from response cache");
                    for (Key key : readOnlyCacheMap.keySet()) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                    key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                        }
                        try {
                            CurrentRequestVersion.set(key.getVersion());
                            Value cacheValue = readWriteCacheMap.get(key);
                            Value currentCacheValue = readOnlyCacheMap.get(key);
                            if (cacheValue != currentCacheValue) {
                                readOnlyCacheMap.put(key, cacheValue);
                            }
                        } catch (Throwable th) {
                            logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                        }
                    }
                }
            };
        }
    

    观察for循环里面的内容,发现只读缓存的内容都是基于读写缓存来的

    增量拉取

    增量拉取的线程调度和发送心跳的线程调度是在一个方法initScheduledTasks中执行的,代码如下:

          int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
    
    

    看一下线程CacheRefreshThread

     class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }
       void refreshRegistry() {
            try {
                boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
    
                boolean remoteRegionsModified = false;
               //省略了一部分无关代码
                //核心
                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }
    
                if (logger.isDebugEnabled()) {
                    StringBuilder allAppsHashCodes = new StringBuilder();
                    allAppsHashCodes.append("Local region apps hashcode: ");
                    allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                    allAppsHashCodes.append(", is fetching remote regions? ");
                    allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                    for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                        allAppsHashCodes.append(", Remote region: ");
                        allAppsHashCodes.append(entry.getKey());
                        allAppsHashCodes.append(" , apps hashcode: ");
                        allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                    }
                    logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                            allAppsHashCodes);
                }
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }        
        }
    

    核心在fetchRegistry方法,这个在上面已经说过了,只不过部分拉取获取调用的接口是getAndUpdateDelta而已

    private void getAndUpdateDelta(Applications applications) throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            Applications delta = null;
            EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                delta = httpResponse.getEntity();
            }
    
            if (delta == null) {
                logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                        + "Hence got the full registry.");
                getAndStoreFullRegistry();
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
                String reconcileHashCode = "";
                if (fetchRegistryUpdateLock.tryLock()) {
                    try {
                        updateDelta(delta);
                        reconcileHashCode = getReconcileHashCode(applications);
                    } finally {
                        fetchRegistryUpdateLock.unlock();
                    }
                } else {
                    logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
                }
                // There is a diff in number of instances for some reason
                if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                    reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
                }
            } else {
                logger.warn("Not updating application delta as another thread is updating it already");
                logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
            }
        }
    

    先看服务端的处理,然后再看如何处理结果吧

    服务端处理增量拉取
    @Path("delta")
        @GET
        public Response getContainerDifferential(
                @PathParam("version") String version,
                @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
    
            boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
    
            // If the delta flag is disabled in discovery or if the lease expiration
            // has been disabled, redirect clients to get all instances
            if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
                return Response.status(Status.FORBIDDEN).build();
            }
    
            String[] regions = null;
            if (!isRemoteRegionRequested) {
                EurekaMonitors.GET_ALL_DELTA.increment();
            } else {
                regions = regionsStr.toLowerCase().split(",");
                Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
                EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
            }
    
            CurrentRequestVersion.set(Version.toEnum(version));
            KeyType keyType = Key.KeyType.JSON;
            String returnMediaType = MediaType.APPLICATION_JSON;
            if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
                keyType = Key.KeyType.XML;
                returnMediaType = MediaType.APPLICATION_XML;
            }
    
            Key cacheKey = new Key(Key.EntityType.Application,
                    ResponseCacheImpl.ALL_APPS_DELTA,
                    keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
            );
    
            if (acceptEncoding != null
                    && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
                return Response.ok(responseCache.getGZIP(cacheKey))
                        .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                        .header(HEADER_CONTENT_TYPE, returnMediaType)
                        .build();
            } else {
                return Response.ok(responseCache.get(cacheKey))
                        .build();
            }
        }
    

    这里的处理逻辑跟全量获取大部分逻辑都是一样的,只有一些几点不同:

    1. hashKey是ApplicationALL_APPS_DELTAJSONV2full
    2. 获取实例列表的时候走的是下面的分支
    if (ALL_APPS.equals(key.getName())) {
                            if (isRemoteRegionRequested) {
                                tracer = serializeAllAppsWithRemoteRegionTimer.start();
                                payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                            } else {
                                tracer = serializeAllAppsTimer.start();
                                payload = getPayLoad(key, registry.getApplications());
                            }
                        } else if (ALL_APPS_DELTA.equals(key.getName())) {
                            if (isRemoteRegionRequested) {
                                tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                                versionDeltaWithRegions.incrementAndGet();
                                versionDeltaWithRegionsLegacy.incrementAndGet();
                                payload = getPayLoad(key,
                                        registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                            } else {
                                tracer = serializeDeltaAppsTimer.start();
                                versionDelta.incrementAndGet();
                                versionDeltaLegacy.incrementAndGet();
                                payload = getPayLoad(key, registry.getApplicationDeltas());
                            }
                        }
    

    看看getApplicationDeltas方法吧

    public Applications getApplicationDeltas() {
            GET_ALL_CACHE_MISS_DELTA.increment();
            Applications apps = new Applications();
            apps.setVersion(responseCache.getVersionDelta().get());
            Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
            try {
                write.lock();
                Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
                logger.debug("The number of elements in the delta queue is : {}",
                        this.recentlyChangedQueue.size());
                while (iter.hasNext()) {
                    Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                    InstanceInfo instanceInfo = lease.getHolder();
                    logger.debug(
                            "The instance id {} is found with status {} and actiontype {}",
                            instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
                    Application app = applicationInstancesMap.get(instanceInfo
                            .getAppName());
                    if (app == null) {
                        app = new Application(instanceInfo.getAppName());
                        applicationInstancesMap.put(instanceInfo.getAppName(), app);
                        apps.addApplication(app);
                    }
                    app.addInstance(decorateInstanceInfo(lease));
                }
    
                boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
    
                if (!disableTransparentFallback) {
                    Applications allAppsInLocalRegion = getApplications(false);
    
                    for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
                        Applications applications = remoteRegistry.getApplicationDeltas();
                        for (Application application : applications.getRegisteredApplications()) {
                            Application appInLocalRegistry =
                                    allAppsInLocalRegion.getRegisteredApplications(application.getName());
                            if (appInLocalRegistry == null) {
                                apps.addApplication(application);
                            }
                        }
                    }
                }
    
                Applications allApps = getApplications(!disableTransparentFallback);
                apps.setAppsHashCode(allApps.getReconcileHashCode());
                return apps;
            } finally {
                write.unlock();
            }
        }
    

    与全量获取不同的是这个最终的结果是从最近租约变更记录队列recentlyChangedQueue里来的,其他的流程则差不多

    处理增量拉取结果

    结果的处理代码

    if (delta == null) {
                logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                        + "Hence got the full registry.");
                getAndStoreFullRegistry();
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
                String reconcileHashCode = "";
                if (fetchRegistryUpdateLock.tryLock()) {
                    try {
                        updateDelta(delta);
                        reconcileHashCode = getReconcileHashCode(applications);
                    } finally {
                        fetchRegistryUpdateLock.unlock();
                    }
                } else {
                    logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
                }
                // There is a diff in number of instances for some reason
                if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                    reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
                }
            }
    
    updateDelta
        private void updateDelta(Applications delta) {
            int deltaCount = 0;
            for (Application app : delta.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    Applications applications = getApplications();
                    String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                    if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                        Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                        if (null == remoteApps) {
                            remoteApps = new Applications();
                            remoteRegionVsApps.put(instanceRegion, remoteApps);
                        }
                        applications = remoteApps;
                    }
    
                    ++deltaCount;
                    if (ActionType.ADDED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                    } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Modified instance {} to the existing apps ", instance.getId());
    
                        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
    
                    } else if (ActionType.DELETED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                        applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
                    }
                }
            }
            logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
    
            getApplications().setVersion(delta.getVersion());
            getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    
            for (Applications applications : remoteRegionVsApps.values()) {
                applications.setVersion(delta.getVersion());
                applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
            }
        }
    

    大致处理流程为:

    1. 获取本地缓存实例
    2. 如果不存在远程拉取到的实例的分区则在remoteRegionVsApps对象中新建分区的key
    3. 根据远程实例的状态(添加、修改、删除)分别进行本地实例状态的更新
    4. 实例的过滤

    原文地址

  • 相关阅读:
    【中文分词】条件随机场CRF
    【中文分词】最大熵马尔可夫模型MEMM
    【中文分词】二阶隐马尔可夫模型2-HMM
    【中文分词】隐马尔可夫模型HMM
    Elasticsearch的CRUD:REST与Java API
    d3的比例尺和坐标轴
    webpack DllPlugin的用法
    webpack单独启动目录方法
    d3的常用方法和数据类型
    d3中的enter,exit,update概念
  • 原文地址:https://www.cnblogs.com/zhixiang-org-cn/p/11730561.html
Copyright © 2011-2022 走看看