zoukankan      html  css  js  c++  java
  • Spring Cloud Eureka(七):DiscoveryClient 源码分析

    1、本节概要

    上一节文章主要介绍了Eureka Client 的服务注册的流程,没有对服务治理进行介绍,本文目的就是从源码角度来学习服务实例的治理机制,主要包括以下内容:

    • 服务注册(register)
    • 服务续约(renew)
    • 服务下线(unregister)
    • 服务拉取(fetchRegistry)
    • 缓存刷新(refreshRegistry)

    eureka client 与 eureka server 的交互式通过REST API 来完成的,那么这时使用的HttpClient工具在,Netflix eureka和 spring cloud eureka 中是有区别的,前者使用的是 JerseyReplicationClient,后者使用的是 RestTemplateEurekaHttpClient

    2、服务注册(register)

    服务注册的方式是通过rest api 进行注册的,注册成功之后返回的正常状态码是 204

    boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }
    
    

    3、服务续约(renew)

    服务先向server端发送一个心跳,如果返回状态是 200,则表示续约成功;如果返回状态码是404,则向服务端重新注册自己。
    续约服务是由一个守护线程每隔 30秒向服务端发送心跳来完成的

     /**
         * 续约方法
         */
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }
    
     // 心跳定时器
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
                        
        private class HeartbeatThread implements Runnable {
    
            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }
    
    

    4、服务下线(unregister)

    通过rest api 向server端发送取消请求,那么在server端将会注销掉该服务,前提是请求注销的服务在注册中心已经存在了。

    void unregister() {
            // It can be null if shouldRegisterWithEureka == false
            if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
                try {
                    logger.info("Unregistering ...");
                    EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                    logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
                } catch (Exception e) {
                    logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
                }
            }
        }
    

    5、服务获取(fetchRegistry)

    如果增量拉取被禁用或是第一次拉取,则全量拉取server端已经注册的服务实例信息,否则只拉取增量服务实例数据

     private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    //全量拉取服务实例数据
                    getAndStoreFullRegistry();
                } else {
                    //增量拉取服务实例
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // 刷新本地缓存
            onCacheRefreshed();
    
            // 基于缓存中的实例数据更新远程实例状态
            updateInstanceRemoteStatus();
    
            // 注册表拉取成功后返回true
            return true;
        }
    

    6、缓存刷新(refreshRegistry)

    系统默认是每隔30秒刷新本地存储的注册表

    
    private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
            ...................................
    }
    
    class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }
    
        @VisibleForTesting
        void refreshRegistry() {
            try {
                boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
    
                boolean remoteRegionsModified = false;
                // This makes sure that a dynamic change to remote regions to fetch is honored.
                String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                if (null != latestRemoteRegions) {
                    String currentRemoteRegions = remoteRegionsToFetch.get();
                    if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                        // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                        synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                            if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                                String[] remoteRegions = latestRemoteRegions.split(",");
                                remoteRegionsRef.set(remoteRegions);
                                instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                                remoteRegionsModified = true;
                            } else {
                                logger.info("Remote regions to fetch modified concurrently," +
                                        " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                            }
                        }
                    } else {
                        // Just refresh mapping to reflect any DNS/Property change
                        instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                    }
                }
    
                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }
    
                if (logger.isDebugEnabled()) {
                    StringBuilder allAppsHashCodes = new StringBuilder();
                    allAppsHashCodes.append("Local region apps hashcode: ");
                    allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                    allAppsHashCodes.append(", is fetching remote regions? ");
                    allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                    for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                        allAppsHashCodes.append(", Remote region: ");
                        allAppsHashCodes.append(entry.getKey());
                        allAppsHashCodes.append(" , apps hashcode: ");
                        allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                    }
                    logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                            allAppsHashCodes);
                }
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }
        }
    

    文末总结:

    1、Eureka client从注册中心拉取服务列表,然后自身会做缓存

    2、作为服务消费者,就是从这些缓存信息中获取的服务提供者的信息

    3、增量更新的服务以30秒为周期循环更新

    4、增量更新数据在服务端保存时间为3分钟,因此Eureka client取得的数据虽然是增量更新,仍然可能和30秒前取的数据一样,所以Eureka client要自己来处理重复信息

    5、Eureka client的增量更新,其实获取的是Eureka server最近三分钟内的变更,如果Eureka client有超过三分钟没有做增量更新的话(例如网络问题),这就造成了Eureka server和Eureka client之间的数据不一致。正常情况下,Eureka client多次增量更新后,最终的服务列表数据应该Eureka server保持一致,但如果期间发生异常,可能导致和Eureka server的数据不一致,为了暴露这个问题,Eureka server每次返回的增量更新数据中,会带有一致性哈希码,Eureka client用本地服务列表数据算出的一致性哈希码应该和Eureka server返回的一致,若不一致就证明增量更新出了问题导致Eureka client和Eureka server上的服务列表信息不一致了,此时需要全量更新

  • 相关阅读:
    Nios学习笔记3——流水灯实验
    Nios学习笔记2——流水灯实验
    Nios学习笔记1——流水灯实验
    转:摄像头camera 7660/7670/7225/9650以及程序流程(一)
    fpga 扇入扇出
    门控时钟的使用
    门控时钟与多扇出问题解决方案
    为所欲为——教你什么才是真正的任意分频
    SDRAM时序--读高手进阶,终极内存技术指南
    FPGA你必须知道的那些事儿
  • 原文地址:https://www.cnblogs.com/liukaifeng/p/10052587.html
Copyright © 2011-2022 走看看