zoukankan      html  css  js  c++  java
  • spring cloud eureka


    代码版本:Dalston.SR4

    Lease

    public class Lease<T> {
    
        enum Action {
            Register, Cancel, Renew
        };
        
        public static final int DEFAULT_DURATION_IN_SECS = 90;
        
        private T holder;
        private long evictionTimestamp;
        private long registrationTimestamp;
        private long serviceUpTimestamp;
        
        private volatile long lastUpdateTimestamp;
        private long duration;
        
        public Lease(T r, int durationInSecs) {
            holder = r;
            registrationTimestamp = System.currentTimeMillis();
            lastUpdateTimestamp = registrationTimestamp;
            //durationInSecs为秒单位, 换算成毫秒
            duration = (durationInSecs * 1000);
        
        }
        
        // 客户端续约时,更新最后的更新时间 , 用当前系统加上过期的时间
        public void renew() {
            lastUpdateTimestamp = System.currentTimeMillis() + duration;
        
        }
    
       // 服务下线时,更新服务下线时间
        public void cancel() {
            if (evictionTimestamp <= 0) {
                evictionTimestamp = System.currentTimeMillis();
            }
        }
    
    
        public void serviceUp() {
            if (serviceUpTimestamp == 0) {
                serviceUpTimestamp = System.currentTimeMillis();
            }
        }
    
    
        public void setServiceUpTimestamp(long serviceUpTimestamp) {
            this.serviceUpTimestamp = serviceUpTimestamp;
        }
    
    
        public boolean isExpired() {
            return isExpired(0l);
        }
    
    
        public boolean isExpired(long additionalLeaseMs) {
            return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
        }
    }
    
    1. Eureka-Server最终处理注册信息的时候,都会转化为这个对象来处理。
    2. DEFAULT_DURATION_IN_SECS : 租约过期的时间常量,默认90秒,也就说90秒没有心跳过来,sever将会自动剔除该节点
    3. holder :这个租约是属于谁的, 目前占用这个属性的是instanceInfo,也就是客户端实例信息。
    4. evictionTimestamp :租约是啥时候过期的,当服务下线的时候,会过来更新这个时间戳
    5. registrationTimestamp :租约的注册时间
    6. serviceUpTimestamp :服务启动时间 ,当客户端在注册的时候,instanceInfo的status 为UP的时候,则更新这个时间戳
    7. lastUpdateTimestamp :最后更新时间,每次续约的时候,都会更新这个时间戳,在判断实例是否过期时,需要用到这个属性。
    8. duration:过期时间,毫秒单位

    Client

    @EnableDiscoveryClient

    入口为注解@EnableDiscoveryClient:

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Import({EnableDiscoveryClientImportSelector.class})
    public @interface EnableDiscoveryClient {
        boolean autoRegister() default true;
    }
    

    该注解通过import导入了EurekaDiscoveryClientConfiguration实例,该实例内部实例化了一个Marker类。
    接下来通过spring boot自动配置会实例EurekaClientAutoConfiguration类,该类就是我们要找的目标,但是该类有个启动条件@ConditionalOnBean({Marker.class})。
    现在我们知道了@EnableDiscoveryClient注解是如何开启eureka客户端的。

    EurekaClientAutoConfiguration:该类是eureka的核心类,是在netflix-eureka-client包下的。
    EurekaClientAutoConfiguration会配置一个DiscoveryClient实例到容器,这个就是我们想要的,该类封装了客户端所有功能。
    那么客户端有哪些功能呢?
    1、向server注册服务/取消服务
    2、向server续约服务
    3、向server查询服务列表

    返回顶部

    DicoveryClient初始化

    先看一下DiscoveryClient属性:

    @Singleton
    public class DiscoveryClient implements EurekaClient {
        private static final Logger logger = LoggerFactory.getLogger(com.netflix.discovery.DiscoveryClient.class);
        public static final String HTTP_X_DISCOVERY_ALLOW_REDIRECT = "X-Discovery-AllowRedirect";
        private static final String VALUE_DELIMITER = ",";
        private static final String COMMA_STRING = ",";
        /**
         * @deprecated
         */
        @Deprecated
        private static EurekaClientConfig staticClientConfig;
        private static final String PREFIX = "DiscoveryClient_";
        private final Counter RECONCILE_HASH_CODES_MISMATCH;
        private final Timer FETCH_REGISTRY_TIMER;
        private final Counter REREGISTER_COUNTER;
        private final ScheduledExecutorService scheduler;
        //用于心跳续约任务的线程池
        private final ThreadPoolExecutor heartbeatExecutor;
        //用于获取服务列表任务的线程池
        private final ThreadPoolExecutor cacheRefreshExecutor;
        private final Provider<HealthCheckHandler> healthCheckHandlerProvider;
        private final Provider<HealthCheckCallback> healthCheckCallbackProvider;
        private final AtomicReference<Applications> localRegionApps;
        private final Lock fetchRegistryUpdateLock;
        private final AtomicLong fetchRegistryGeneration;
        private final ApplicationInfoManager applicationInfoManager;
        private final InstanceInfo instanceInfo;
        private final AtomicReference<String> remoteRegionsToFetch;
        private final AtomicReference<String[]> remoteRegionsRef;
        private final InstanceRegionChecker instanceRegionChecker;
        private final EndpointUtils.ServiceUrlRandomizer urlRandomizer;
        private final Provider<BackupRegistry> backupRegistryProvider;
        //负责发起远程调用
        private final com.netflix.discovery.DiscoveryClient.EurekaTransport eurekaTransport;
        private volatile HealthCheckHandler healthCheckHandler;
        private volatile Map<String, Applications> remoteRegionVsApps;
        private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus;
        private final CopyOnWriteArraySet<EurekaEventListener> eventListeners;
        private String appPathIdentifier;
        private ApplicationInfoManager.StatusChangeListener statusChangeListener;
        private InstanceInfoReplicator instanceInfoReplicator;
        private volatile int registrySize;
        private volatile long lastSuccessfulRegistryFetchTimestamp;
        private volatile long lastSuccessfulHeartbeatTimestamp;
        private final ThresholdLevelsMetric heartbeatStalenessMonitor;
        private final ThresholdLevelsMetric registryStalenessMonitor;
        private final AtomicBoolean isShutdown;
        protected final EurekaClientConfig clientConfig;
        protected final EurekaTransportConfig transportConfig;
        private final long initTimestampMs;
    }
    

    DiscoveryClient构造器:

        @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager,
                        EurekaClientConfig config,
                        AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider) {
    
    
            // 创建各种Executor 和 eurekaTransport、instanceRegionChecker
            try {
                // 执行定时任务的定时器,定时线程名为 DiscoveryClient-%d
                // 在定时器中用于定时执行TimedSupervisorTask监督任务,监督任务会强制超时 和 记录监控数据
                scheduler = Executors.newScheduledThreadPool(3,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
    
                // 执行heartbeat心跳任务的执行器,默认最大线程数=2,线程名为:DiscoveryClient-HeartbeatExecutor-%d
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  
    
                // 执行服务列表缓存刷新的执行器,默认最大线程数=2,线程名为:DiscoveryClient-CacheRefreshExecutor-%d
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  
    
                eurekaTransport = new EurekaTransport();
                // 初始化eurekaTransport在服务注册,获取服务列表时的client
                scheduleServerEndpointTask(eurekaTransport, args);
    
                instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
            }
    
            // 如果需要从eureka server获取服务列表,并且尝试fetchRegistry(false)失败,调用BackupRegistry
            if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
                fetchRegistryFromBackup();
            }
    
            // 【重点】初始化所有定时任务
            initScheduledTasks();
    
        }
    
        private void initScheduledTasks() {
            // 1、如果要从Eureka Server获取服务列表
            if (clientConfig.shouldFetchRegistry()) {
                // 从eureka服务器获取注册表信息的频率(默认30s)
                // 同时也是单次获取服务列表的超时时间
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                // 如果缓存刷新超时,下一次执行的delay最大是registryFetchIntervalSeconds的几倍(默认10),
                // 默认每次执行是上一次的2倍
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    
                /**
                 * 【#### 执行CacheRefreshThread,服务列表缓存刷新任务 ####】
                 * 执行TimedSupervisorTask监督任务的定时器,具体执行器为cacheRefreshExecutor,任务为CacheRefreshThread
                 */
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",               //监控名
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds, //指定具体任务的超时时间
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
    
            // 2、如果要注册到Eureka Server
            if (clientConfig.shouldRegisterWithEureka()) {
                // 续租的时间间隔(默认30s)
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                // 如果心跳任务超时,下一次执行的delay最大是renewalIntervalInSecs的几倍(默认10),默认每次执行是上一次的2倍
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
    
                // Heartbeat timer
                /**
                 * 【#### 执行HeartbeatThread,发送心跳数据 ####】
                 * 执行TimedSupervisorTask监督任务的定时器,具体执行器为heartbeatExecutor,任务为HeartbeatThread
                 */
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // InstanceInfo replicator
                /**
                 * 【#### InstanceInfo复制器 ####】
                 * 启动后台定时任务scheduler,线程名为 DiscoveryClient-InstanceInfoReplicator-%d
                 * 默认每30s执行一次定时任务,查看Instance信息(DataCenterInfo、LeaseInfo、InstanceStatus)是否有变化
                 * 如果有变化,执行 discoveryClient.register()
                 */
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,            //当前DiscoveryClient
                        instanceInfo,    //当前实例信息
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),//InstanceInfo的复制间隔(默认30s)
                        2); // burstSize
    
                /**
                 * 【StatusChangeListener 状态改变监听器】
                 */
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
    
                        //使用InstanceInfo复制器 scheduler.submit()一个Runnable任务
                        //后台马上执行 discoveryClient.register()
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                /**
                 * 是否关注Instance状态变化,使用后台线程将状态同步到eureka server(默认true)
                 * 调用 ApplicationInfoManager#setInstanceStatus(status) 会触发
                 * 将 StatusChangeListener 注册到 ApplicationInfoManager
                 */
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
                // 启动InstanceInfo复制器
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            }
            // 当前服务实例不注册到Eureka Server
            else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }
    
    1. 客户端的几种功能都是定时触发的,所以在构造器中初始化了三种定时任务
      • InstanceInfoReplicator:更新并复制本地实例状态到Server端 (注册/取消服务)
      • TimedSupervisorTask -- heartbeat:向server发送心跳续约 (续约)
      • TimedSupervisorTask -- cacheRefresh:刷新服务列表 (获取服务列表)
    2. 初始化状态改变监听器:当InstanceInfo改变时会监听,
    3. 初始化EurekaTransport。这个是向server发起远程调用的辅助类

    返回顶部

    更新本地服务到server

    有两种场景会发起注册:

    1. 当应用启动的时候,如果应用开启了自动注册(默认开启), 那么在自动配置类加载的时候,会通过EurekaAutoServiceRegistration实例化的时候,去改变instance的status, 最终被监听器监听到,执行服务注册的代码
    2. 主要应用于启动之后,当应用的信息发生改变之后,每40每秒执行一次的线程,检测到了,也会自动去注册一次。

    第一种场景代码省略,看下第二种场景的定时任务

    // InstanceInfoReplicator#run()
    public void run() {
        try {
            /**
             * 刷新 InstanceInfo
             * 1、刷新 DataCenterInfo
             * 2、刷新 LeaseInfo 租约信息
             * 3、根据HealthCheckHandler获取InstanceStatus,并更新,如果状态发生变化会触发所有StatusChangeListener
             */
            discoveryClient.refreshInstanceInfo();
    
            // 如果isInstanceInfoDirty=true,返回dirtyTimestamp,否则是null
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();  //发起注册
                instanceInfo.unsetIsDirty(dirtyTimestamp);  //isInstanceInfoDirty置为false
            }
        } 
        catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } 
        finally { // 继续下次任务
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
    
    // DiscoveryClient#register()
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }
    
    1. 先刷新InstanceInfo,刷新后如果发现有脏数据,即实例发生了变更,还未同步给Server的数据,就通过register方法发起注册
    2. 注册是通过eurekaTransport实现,eurekaTransport是在eureka中专门负责远程请求的。

    先看下EurekaTransport什么样:

    //DiscoveryClient.EurekaTransport
    private static final class EurekaTransport {
        private ClosableResolver bootstrapResolver;
    
        //负责传输消息的客户端工厂(底层用于和Server交互的http框架是 Jersey,此处的工厂就和Jersey相关)
        private TransportClientFactory transportClientFactory;
    
        //负责注册、续约相关
        private EurekaHttpClient registrationClient;
        private EurekaHttpClientFactory registrationClientFactory;
    
        //负责获取Server端服务列表
        private EurekaHttpClient queryClient;
        private EurekaHttpClientFactory queryClientFactory;
    }
    

    1、EurekaTransport是DiscoveryClient的内部类
    2、内部包含以下属性:
    TransportClientFactory:负责传输消息的客户端工厂(底层用于和Server交互的http框架是Jersey)
    registrationClient:负责注册、续约相关
    queryClient:负责获取Server端服务列表
    3、在DiscoveryClient的构造器中对EurekaTransport进行的初始化,初始化大约实现了下面这个逻辑:
    采用工厂模式+代理模式实现了对其所有实现类的逐层调用,由外到内大致如下:
    SessionedEurekaHttpClient: 强制在一定时间间隔后重连EurekaHttpClient,防止永远只连接特定Eureka Server,反过来保证了在Server端集群拓扑发生变化时的负载重分配
    RetryableEurekaHttpClient: 带有重试功能,默认最多3次,在配置的所有候选Server地址中尝试请求,成功重用,失败会重试另一Server,并维护隔离清单,下次跳过,当隔离数量达到阈值,清空隔离清单,重新开始
    RedirectingEurekaHttpClient: Server端返回302重定向时,客户端shutdown原EurekaHttpClient,根据response header中的Location新建EurekaHttpClient
    MetricsCollectingEurekaHttpClient: 统计收集Metrics信息
    JerseyApplicationClient: AbstractJerseyEurekaHttpClient的子类
    AbstractJerseyEurekaHttpClient: 底层实现通过Jersery注册、发心跳等的核心类
    jerseyClient: Jersery客户端
    也就是发起一次调用,要经过上边这些所有的实现类。

    心跳续约

    30秒一次

    private class HeartbeatThread implements Runnable {
     
        public void run() {
            if (renew()) {
                // 更新最后一次心跳的时间
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    // 续约的主方法
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                return register();
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }
    
    1. 发送数据格式:apps/ + appName + /' + id

    获取注册信息

    class CacheRefreshThread implements Runnable {
        public void run() {
            // 刷新注册信息
            refreshRegistry();
        }
    }
    void refreshRegistry() {
        try {
            boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
    
            boolean remoteRegionsModified = false;
            
            // 判断是否需要全量获取 , remoteRegionsModified  这个值来决定
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
                String currentRemoteRegions = remoteRegionsToFetch.get();
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                    // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                    synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                        if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                            String[] remoteRegions = latestRemoteRegions.split(",");
                            remoteRegionsRef.set(remoteRegions);
                            instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                            remoteRegionsModified = true;
                        } else {
                            logger.info("Remote regions to fetch modified concurrently," +
                                    " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                        }
                    }
                } else {
                    // Just refresh mapping to reflect any DNS/Property change
                    instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                }
            }
            // 获取注册信息
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
            // 日志输出 , 省略。。
            
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }        
    }
    
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
        try {
            // 获取本地的缓存信息 , 也就是客户端注册信息
            Applications applications = getApplications();
            
            // 判断是否需要全量获取
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                getAndStoreFullRegistry();//入口1
            } else {
                // 增量获取
                getAndUpdateDelta(applications);//入口2
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
    
        // 发布缓存刷新事件。
        onCacheRefreshed();
    
        // 更新本地应用的状态
        updateInstanceRemoteStatus();
    
        // registry was fetched successfully, so return true
        return true;
    }
    
    private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
    
        logger.info("Getting all instance registry info from the eureka server");
    
        Applications apps = null;
        // 发送HTTP请求,去服务端获取注册信息
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());
    
        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            // 设置到本地缓存里面去
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }
    
    private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
    
        Applications delta = null;
        // 增量获取信息
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }
    
        if (delta == null) {
            // 增量获取为空,则全量返回
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            //这里设置原子锁的原因是怕某次调度网络请求时间过长,导致同一时间有多线程拉取到增量信息并发修改
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    // 将获取到的增量信息和本地缓存信息合并。 
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    // 释放锁
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // ( HashCode 不一致|| 打印增量和全量的差异 )= true 重新去全量获取
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }
    
    private void updateDelta(Applications delta) {
        int deltaCount = 0;
        // 循环拉取过来的应用列表
        for (Application app : delta.getRegisteredApplications()) {
            // 循环这个应用里面的实例(有多个实例代表是集群的。)
            for (InstanceInfo instance : app.getInstances()) {
                // 获取本地的注册应用列表
                Applications applications = getApplications();
                String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                    Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                    if (null == remoteApps) {
                        remoteApps = new Applications();
                        remoteRegionVsApps.put(instanceRegion, remoteApps);
                    }
                    applications = remoteApps;
                }
                
                ++deltaCount;
                if (ActionType.ADDED.equals(instance.getActionType())) {// 添加事件
                    //根据AppName 获取本地的数据,看这个应用是否存在
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        // 不存在,则加到本地的应用里面去
                        applications.addApplication(app);
                    }
                    logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                    // 为本地这个应用添加这个实例
                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                } else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 修改事件
                    //根据AppName 获取本地的数据,看这个应用是否存在
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        // 不存在,则加到本地的应用里面去
                        applications.addApplication(app);
                    }
                    logger.debug("Modified instance {} to the existing apps ", instance.getId());
                    // 为本地这个应用添加这个实例
                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
    
                } else if (ActionType.DELETED.equals(instance.getActionType())) {  // 删除事件
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                    // 移除这个实例
                    applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
                }
            }
        }
        logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
    
        getApplications().setVersion(delta.getVersion());
        getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    
        for (Applications applications : remoteRegionVsApps.values()) {
            applications.setVersion(delta.getVersion());
            applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
        }
    }
    
    1. 定时任务30秒执行一次
    2. 判断是否需要全量获取,根据配置判断,根据本地缓存是否为空判断
    3. 先获取本地缓存的实例信息
    4. 全量获取
      • 发送HTTP请求,去服务端获取注册信息
      • 设置到本地缓存里面
    5. 增量获取
      • 先增量获取,如果没获取到,在全量获取
      • 将请求过来的增量数据和本地的数据做合并
      • 从服务端获取了最近这段时间,新注册新来的客户端信息,有过修改的,被删除的, 这三大类的实例信息然后通过覆盖本地的数据,移除数据,来达到数据合并的需求
    6. 发布缓存刷新事件
    7. 更新本地应用的状态

    Server

    注解@EnableEurekaServer引出自动配置类EurekaServerAutoConfiguration

    开启注解@EnableEurekaServer,和客户端作用是一样的,都是通过Marker类开启自动配置类EurekaServerAutoConfiguration。

    接下来看下EurekaServerAutoConfiguration

    
    @SpringBootApplication
    public class Ads2Application {
        public static void main(String[] args) {
            SpringApplication.run(Ads2Application.class,args);
        }
    }
    @Configuration
    //Eureka Server初始化的配置类
    //入口1
    @Import({EurekaServerInitializerConfiguration.class})
    @ConditionalOnBean({EurekaServerMarkerConfiguration.Marker.class})//Marker类
    //实例注册相关属性和仪表盘相关属性
    @EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})
    @PropertySource({"classpath:/eureka/server.properties"})
    public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
        private static String[] EUREKA_PACKAGES = new String[]{"com.netflix.discovery", "com.netflix.eureka"};
        @Autowired
        private ApplicationInfoManager applicationInfoManager;
        @Autowired
        private EurekaServerConfig eurekaServerConfig;
        @Autowired
        private EurekaClientConfig eurekaClientConfig;
        @Autowired
        private EurekaClient eurekaClient;
        @Autowired
        private InstanceRegistryProperties instanceRegistryProperties;
    
        //在注册实例时会考虑集群情况下其它Node相关操作的注册器
        @Bean
        public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
            this.eurekaClient.getApplications();
            return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
        }
    
        @Bean
        @ConditionalOnMissingBean
        //用来管理PeerEurekaNode的帮助类
        public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) {
            return new PeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
        }
    
        @Bean
        //入口2
        //Eureka Server上下文初始化
        //因为netflix设计的EurekaServerContext接口本身包含很多成员变量,
        // 如PeerEurekaNodes管理对等节点、PeerAwareInstanceRegistry考虑对等节点的实例注册器等,
        // 在Eureka Server上下文初始化时会对这些组件初始化,还会启动一些定时线程
        public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
            return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);
        }
    
        @Bean
        //Eureka Server启动引导,会在Spring容器refresh()完毕时由EurekaServerInitializerConfiguration#run()
        // 方法真正调用eurekaServerBootstrap.contextInitialized()初始化,其中initEurekaEnvironment()、
        // initEurekaServerContext() Eureka Server启动分析重点
        //入口
        public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
            return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext);
        }
    
        @Bean
        //注册 Jersey filter
        //所有/eureka的请求都需要经过Jersery Filter,其处理类是com.sun.jersey.spi.container.servlet.ServletContainer,
        // 其既是Filter,也是Servlet,包含Jersey的处理逻辑。
        public FilterRegistrationBean jerseyFilterRegistration(Application eurekaJerseyApp) {
            FilterRegistrationBean bean = new FilterRegistrationBean();
            bean.setFilter(new ServletContainer(eurekaJerseyApp));
            bean.setOrder(2147483647);
            bean.setUrlPatterns(Collections.singletonList("/eureka/*"));
            return bean;
        }
    
        @Bean
        public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
            ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);
            provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
            provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
            Set<Class<?>> classes = new HashSet();
            String[] var5 = EUREKA_PACKAGES;
            int var6 = var5.length;
    
            for(int var7 = 0; var7 < var6; ++var7) {
                String basePackage = var5[var7];
                Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
                Iterator var10 = beans.iterator();
    
                while(var10.hasNext()) {
                    BeanDefinition bd = (BeanDefinition)var10.next();
                    Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
                    classes.add(cls);
                }
            }
    
            Map<String, Object> propsAndFeatures = new HashMap();
            propsAndFeatures.put("com.sun.jersey.config.property.WebPageContentRegex", "/eureka/(fonts|images|css|js)/.*");
            DefaultResourceConfig rc = new DefaultResourceConfig(classes);
            rc.setPropertiesAndFeatures(propsAndFeatures);
            return rc;
        }
    
        @Configuration
        protected static class EurekaServerConfigBeanConfiguration {
            protected EurekaServerConfigBeanConfiguration() {
            }
    
            @Bean
            @ConditionalOnMissingBean
            //注入Eureka Server配置类,
            // netflix的默认实现类是DefaultEurekaServerConfig,spring cloud的默认实现类是EurekaServerConfigBean
            public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
                EurekaServerConfigBean server = new EurekaServerConfigBean();
                if (clientConfig.shouldRegisterWithEureka()) {
                    server.setRegistrySyncRetries(5);
                }
    
                return server;
            }
        }
    }
    
    1. 初始化DefaultEurekaServerContext,即Eureka Server上下文
    2. 初始化EurekaServerBootstrap

    接下来分别看一下这两个类的初始化过程

    返回顶部

    初始化DefaultEurekaServerContext

    public interface EurekaServerContext {
    
        void initialize() throws Exception;
    
        void shutdown() throws Exception;
    
        EurekaServerConfig getServerConfig();
    
        PeerEurekaNodes getPeerEurekaNodes();
    
        ServerCodecs getServerCodecs();
    
        PeerAwareInstanceRegistry getRegistry();
    
        ApplicationInfoManager getApplicationInfoManager();
    
    }
    
    //DefaultEurekaServerContext
    @PostConstruct
    @Override
    public void initialize() throws Exception {
        logger.info("Initializing ...");
        peerEurekaNodes.start();//入口1
        registry.init(peerEurekaNodes);//入口2
        logger.info("Initialized");
    }
    
    public void start() {
        // 后台运行的单线程定时任务执行器,定时线程名:Eureka-PeerNodesUpdater
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        
        try {
            // 解析Eureka Server URL,并更新PeerEurekaNodes列表
            updatePeerEurekaNodes(resolvePeerUrls());
            
            // 启动定时执行任务peersUpdateTask(定时默认10min,由peerEurekaNodesUpdateIntervalMs配置)
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 定时任务中仍然是 解析Eureka Server URL,并更新PeerEurekaNodes列表
                        updatePeerEurekaNodes(resolvePeerUrls());//入口1+入口2
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }
    
                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        
        // 打印对等体节点(应该没有当前节点自己)
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  " + node.getServiceUrl());
        }
    }
    
    protected List<String> resolvePeerUrls() {
        // 当前Eureka Server自己的InstanceInfo信息
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        // 当前Eureka Server所在的zone,默认是 defaultZone
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
        // 获取配置的service-url
        List<String> replicaUrls = EndpointUtils
                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
    
        // 遍历service-url,排除自己
        int idx = 0;
        while (idx < replicaUrls.size()) {
            if (isThisMyUrl(replicaUrls.get(idx))) {//入口
                replicaUrls.remove(idx);
            } else {
                idx++;
            }
        }
        return replicaUrls;
    }
    
    
    public boolean isThisMyUrl(String url) {
        return isInstanceURL(url, applicationInfoManager.getInfo());
    }
    
    
    public boolean isInstanceURL(String url, InstanceInfo instance) {
        // 根据配置项的url获取host主机信息
        String hostName = hostFromUrl(url); 
        
        // 根据当前Eureka Server的Instance实例信息获取host主机信息
        String myInfoComparator = instance.getHostName();
        
        // 如果eureka.client.transport.applicationsResolverUseIp==true,即按照IP解析URL
        // 那么将当前Eureka Server的Instance实例信息转换为IP
        if (clientConfig.getTransportConfig().applicationsResolverUseIp()) {
            myInfoComparator = instance.getIPAddr();
        }
        
        // 比较配置项的hostName 和 当前Eureka Server的Instance实例信息
        return hostName != null && hostName.equals(myInfoComparator);
    }
    
    // PeerEurekaNodes#updatePeerEurekaNodes()
    // newPeerUrls为本次要更新的Eureka对等体URL列表
    protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
        if (newPeerUrls.isEmpty()) {
            logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
            return;
        }
    
        // 计算 原peerEurekaNodeUrls - 新newPeerUrls 的差集,就是多余可shutdown节点
        Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
        toShutdown.removeAll(newPeerUrls);
        
        // 计算 新newPeerUrls - 原peerEurekaNodeUrls 的差集,就是需要新增节点
        Set<String> toAdd = new HashSet<>(newPeerUrls);
        toAdd.removeAll(peerEurekaNodeUrls);
    
        if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change 没有变更
            return;
        }
    
        // Remove peers no long available
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
    
        // shutDown多余节点
        if (!toShutdown.isEmpty()) {
            logger.info("Removing no longer available peer nodes {}", toShutdown);
            int i = 0;
            while (i < newNodeList.size()) {
                PeerEurekaNode eurekaNode = newNodeList.get(i);
                if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                    newNodeList.remove(i);
                    eurekaNode.shutDown();
                } else {
                    i++;
                }
            }
        }
    
        // Add new peers
        // 添加新的peerEurekaNode - createPeerEurekaNode()
        if (!toAdd.isEmpty()) {
            logger.info("Adding new peer nodes {}", toAdd);
            for (String peerUrl : toAdd) {
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }
    
        this.peerEurekaNodes = newNodeList;
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }
    
    // PeerAwareInstanceRegistryImpl#init()
    @Override
    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        // 【重要】启动用于统计最后xx毫秒续约情况的定时线程
        this.numberOfReplicationsLastMin.start();
        
        this.peerEurekaNodes = peerEurekaNodes;
        
        // 【重要】初始化ResponseCache: 对客户端查询服务列表信息的缓存(所有服务列表、增量修改、单个应用)
        // 默认responseCacheUpdateIntervalMs=30s
        initializedResponseCache();
        
        // 【重要】定期更新续约阀值的任务,默认900s执行一次
        //  调用 PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
        scheduleRenewalThresholdUpdateTask();
        
        // 初始化 远程区域注册 相关信息(默认没有远程Region,都是使用us-east-1)
        initRemoteRegionRegistry();
    
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
        }
    }
    
    // MeasuredRate#start()
    public synchronized void start() {
        if (!isActive) {
            timer.schedule(new TimerTask() {
    
                @Override
                public void run() {
                    try {
                        // Zero out the current bucket.
                        // 将当前的桶的统计数据放到lastBucket,当前桶置为0
                        lastBucket.set(currentBucket.getAndSet(0));
                    } catch (Throwable e) {
                        logger.error("Cannot reset the Measured Rate", e);
                    }
                }
            }, sampleInterval, sampleInterval);
    
            isActive = true;
        }
    }
    
    /**
     * Returns the count in the last sample interval.
     * 返回上一分钟的统计数
     */
    public long getCount() {
        return lastBucket.get();
    }
    
    /**
     * Increments the count in the current sample interval.
     * 增加当前桶的计数,在以下2个场景有调用:
     * AbstractInstanceRegistry#renew() - 续约
     * PeerAwareInstanceRegistryImpl#replicateToPeers() - 
     */
    public void increment() {
        currentBucket.incrementAndGet();
    }
    
    // ResponseCacheImpl构造
    private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
    private final LoadingCache<Key, Value>  readWriteCacheMap;
    
    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        // 根据配置eureka.server.useReadOnlyResponseCache判断,是否使用只读ResponseCache,默认true
        // 由于ResponseCache维护这一个可读可写的readWriteCacheMap,还有一个只读的readOnlyCacheMap
        // 此配置控制在get()应用数据时,是去只读Map读,还是读写Map读,应该只读Map是定期更新的
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;
    
        // eureka.server.responseCacheUpdateIntervalMs缓存更新频率,默认30s
        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        
        // 创建读写Map,com.google.common.cache.LoadingCache
        // 可以设置初始值,数据写入过期时间,删除监听器等
        this.readWriteCacheMap =
                CacheBuilder.newBuilder().initialCapacity(1000)
                        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                        .removalListener(new RemovalListener<Key, Value>() {
                            @Override
                            public void onRemoval(RemovalNotification<Key, Value> notification) {
                                Key removedKey = notification.getKey();
                                if (removedKey.hasRegions()) {
                                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                }
                            }
                        })
                        .build(new CacheLoader<Key, Value>() {
                            @Override
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                }
                                Value value = generatePayload(key);
                                return value;
                            }
                        });
    
        // 如果启用只读缓存,那么每隔responseCacheUpdateIntervalMs=30s,执行getCacheUpdateTask()
        if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }
    
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
        }
    }
    
    1. 初始化对等节点信息,也就是更新集群节点信息。先执行一遍,然后在开启个定时任务定时执行(10分钟一次),具体如何执行:
      • 获取server所在zone下所有配置的service-url
      • 遍历将自己过滤掉,过滤规则
        • 如果eureka.client.transport.applicationsResolverUseIp==true,即按照IP解析URL,那么将当前server的host转换成ip,然后挨个比较
        • 如果没配置则按照hostName进行比较
    2. 根据上面获取到的最新的server列表,和server就的server列表进行差集计算,多余的就是需要添加的。反过来差集则是需要剔除掉的
    3. 启动用于统计最后xx毫秒续约情况的定时线程
    4. 初始化ResponseCache
      • ResponseCache是对客户端查询服务列表信息的缓存
      • 默认responseCacheUpdateIntervalMs=30s,默认30s更新一次
    5. 定期更新续约阀值的任务,默认900s执行一次
    6. 初始化远程区域注册相关信息(默认没有远程Region,都是使用us-east-1)

    返回顶部

    EurekaServerBootstrap初始化

    在spring容器初始化完毕后调用

    // EurekaServerBootstrap#contextInitialized()
    public void contextInitialized(ServletContext context) {
    	try {
    		initEurekaEnvironment();    
    		initEurekaServerContext(); //入口
    
    		context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    	}
    	catch (Throwable e) {
    		log.error("Cannot bootstrap eureka server :", e);
    		throw new RuntimeException("Cannot bootstrap eureka server :", e);
    	}
    }
    
    
    // EurekaServerBootstrap#initEurekaServerContext()
    protected void initEurekaServerContext() throws Exception {
    	// For backward compatibility
    	JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
    			XStream.PRIORITY_VERY_HIGH);
    	XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
    			XStream.PRIORITY_VERY_HIGH);
    
        // 是否为AWS环境
    	if (isAws(this.applicationInfoManager.getInfo())) {
    		this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
    				this.eurekaClientConfig, this.registry, this.applicationInfoManager);
    		this.awsBinder.start();
    	}
    
        // 将serverContext由Holder保管
    	EurekaServerContextHolder.initialize(this.serverContext);
    
    	log.info("Initialized server context");
    
    	// Copy registry from neighboring eureka node
        // 从相邻的eureka节点拷贝注册列表信息
    	int registryCount = this.registry.syncUp();//入口1
    	this.registry.openForTraffic(this.applicationInfoManager, registryCount);入口2
    
    	// Register all monitoring statistics.
    	EurekaMonitors.registerAllStats();
    }
    
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        // 获取到的注册节点数量
        int count = 0;
        // 如果count==0 , 那么默认重试5次(前提是开启了register-with-eureka = true,否则为0)
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    // 从第二次开始,每次默认沉睡30秒
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            // 从本地内存里面获取注册实例信息
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        // 判断是否可以注册
                        if (isRegisterable(instance)) {
                            // 注册到当前Eureka Server里面
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }
    
    // InstanceRegistry#openForTraffic()
    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // 如果count==0,即没有从相邻eureka节点得到服务列表,如单机启动模式,defaultOpenForTrafficCount=1
    	super.openForTraffic(applicationInfoManager,
    			count == 0 ? this.defaultOpenForTrafficCount : count);
    }
    
    
    // PeerAwareInstanceRegistryImpl#openForTraffic()
    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        // 每分钟期待的续约数(默认30s续约,60s就是2次)
        this.expectedNumberOfRenewsPerMin = count * 2; 
        
        // 每分钟续约的阀值:85% * expectedNumberOfRenewsPerMin
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
        logger.info("Got " + count + " instances from neighboring DS node");
        logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
        
        this.startupTime = System.currentTimeMillis();
        if (count > 0) { //可count默认值是1,那么peerInstancesTransferEmptyOnStartup始终不会是true
                         //在PeerAwareInstanceRegistryImpl#shouldAllowAccess(boolean)方法有用
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        
        // 开启新的【EvictionTask】
        super.postInit();
    }
    
    protected void postInit() {
        renewsLastMin.start(); //统计上一分钟续约数的监控Timer
        
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),  //默认60s
                serverConfig.getEvictionIntervalTimerInMs());
    }
    

    这一步做的事情是集群启动同步

    1. 从相邻的eureka节点拷贝注册列表信息,循环,最多重试RegistrySyncRetries次(默认 5)
    2. openForTraffic:允许开始与客户端的数据传输,即开始作为Server服务

    返回顶部

    Server处理注册请求

    服务端注册实现类有两种,单机和集群,直接看集群的。

    客户端注册实例到server,首先就是进入下面这个方法

    // PeerAwareInstanceRegistryImpl#register()
        /**
         * 注册有关InstanceInfo信息,并将此信息复制到所有对等的eureka节点
         * 如果这是来自其他节点的复制事件,则不会继续复制它
         *
         * @param info
         *            the {@link InstanceInfo} to be registered and replicated.
         * @param isReplication
         *            true if this is a replication event from other replica nodes,
         *            false otherwise.
         */
        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            //租约的过期时间,默认90秒,也就是说当服务端超过90秒没有收到客户端的心跳,则主动剔除该节点
            int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; 
    
            // 如果当前Instance实例的租约信息中有leaseDuration持续时间,使用实例的leaseDuration(也就是以客户端为准)
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
    
            // 【 当前Eureka Server注册实例信息 】
            super.register(info, leaseDuration, isReplication);
    
            // 【 将注册实例信息复制到集群中其它节点 】
            replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
        }
    
    1. 将服务实例注册到当前server
    2. 将服务实例同步到其他集群节点

    将服务实例注册到自身

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock(); //读锁
            
            // registry是保存所有应用实例信息的Map:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
            // 从registry中获取当前appName的所有实例信息
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            
            REGISTER.increment(isReplication); //注册统计+1
            
            // 如果当前appName实例信息为空,新建Map
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            
            // 获取实例的Lease租约信息
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            // 如果已经有租约,则保留最后一个脏时间戳而不覆盖它
            // (比较当前请求实例租约 和 已有租约 的LastDirtyTimestamp,选择靠后的)
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } 
            else {
                // The lease does not exist and hence it is a new registration
                // 如果之前不存在实例的租约,说明是新实例注册
                // expectedNumberOfRenewsPerMin期待的每分钟续约数+2(因为30s一个)
                // 并更新numberOfRenewsPerMinThreshold每分钟续约阀值(85%)
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease); //当前实例信息放到维护注册信息的Map
            
            // 同步维护最近注册队列
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            
            // This is where the initial state transfer of overridden status happens
            // 如果当前实例已经维护了OverriddenStatus,将其也放到此Eureka Server的overriddenInstanceStatusMap中
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
    
            // Set the status based on the overridden status rules
            // 根据overridden status规则,设置状态
            InstanceStatus overriddenInstanceStatus 
                = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
            // If the lease is registered with UP status, set lease service up timestamp
            // 如果租约以UP状态注册,设置租赁服务时间戳
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            
            registrant.setActionType(ActionType.ADDED); //ActionType为 ADD
            recentlyChangedQueue.add(new RecentlyChangedItem(lease)); //维护recentlyChangedQueue
            registrant.setLastUpdatedTimestamp(); //更新最后更新时间
            
            // 使当前应用的ResponseCache失效
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock(); //读锁
        }
    }
    
    1. 维护当前Instance实例的Lease租约信息,并且更新注册实例到Map中。Server通过Map来维护注册信息。
    2. 如果是新注册(有可能是续约和注册),expectedNumberOfRenewsPerMin期待的每分钟续约数+2, 并更新numberOfRenewsPerMinThreshold每分钟续约阀值
      • 介绍一下这两个参数的作用:用这两个参数可以实现enrueka的自我保护机制,开启自我保护机制,当发生网络抖动,client向服务端续约少于阀值,触发自我保护机制,server就不会剔除任何实例服务。
        expectedNumberOfRenewsPerMin :每分钟最大的续约数量,由于客户端是每30秒续约一次,一分钟就是续约2次, count代表的是客户端数量。
        计算出一个总数据: 客户端数量*2
        numberOfRenewsPerMinThreshold : 每分钟最小续约数量, 使用expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()。
        serverConfig.getRenewalPercentThreshold()的默认值为0.85 , 也就是说每分钟的续约数量要大于85% 。
        Eureka的自我保护机制,都是围绕这两个变量来实现的, 如果每分钟的续约数量小于numberOfRenewsPerMinThreshold , 就会开启自动保护机制。在此期间,不会再主动剔除任何一个客户端。
    3. 更新ResponseCache。为了并发性能,增加了一个guava的二级缓存。
      guava二级缓存:可以理解维护了两个Map,ReadOnlyMap和ReadWriteMap。
      当有get请求的时候,先查ReadOnlyMap,没有在查ReadWriteMap,如果ReadWriteMap也没有,ReadWriteMap内部就会取存储注册信息的map去加载。

    将服务实例同步到其他集群节点

    // PeerAwareInstanceRegistryImpl#replicateToPeers()
    /**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            // 如果是复制操作(针对当前节点,false)
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            
            // If it is a replication already, do not replicate again as this will create a poison replication
            // 如果它已经是复制,请不要再次复制,直接return
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
    
            // 遍历集群所有节点(除当前节点外)
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                
                // 复制Instance实例操作到某个node节点
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } 
        finally {
            tracer.stop();
        }
    }
    
    
    
    // PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers()
    /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:  //取消
                    node.cancel(appName, id);
                    break;
                case Heartbeat:  //心跳
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:  //注册
                    node.register(info);//入口
                    break;
                case StatusUpdate:  //状态更新
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:  //删除OverrideStatus
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }
    
    
    
    // PeerEurekaNode#register()
    /**
     * Sends the registration information of {@link InstanceInfo} receiving by
     * this node to the peer node represented by this class.
     *
     * @param info
     *            the instance information {@link InstanceInfo} of any instance
     *            that is send to this instance.
     * @throws Exception
     */
    public void register(final InstanceInfo info) throws Exception {
        // 当前时间 + 30s后 过期
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        
        // 提交相同的操作到批量复制任务处理
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, overriddenStatus:null, replicateInstanceInfo:true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }
    
    1. 其实和client注册是一样的,server收集自己的实例信息,然后作为一个client向集群其他server发起注册

    返回顶部

    server处理续约请求

    //InstanceResource
    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        // 续约
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
        // 续约失败
        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response = null;
        // 比较lastDirtyTimestamp 
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            // 比较lastDirtyTimestamp的大小,这个还是比较重要的
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }
     
     
     
     
     
     
    private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
                                            boolean isReplication) {
        // 获取本机的instance实例信息
        InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
        if (appInfo != null) {
            //如果lastDirtyTimestamp不为空,并且lastDirtyTimestamp和本地的不相等
            if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
                Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
                // lastDirtyTimestamp>本地的时间,则认为当前实例是无效的,返回404错误,客户端重新发起注册
                if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                    logger.debug(
                            "Time to sync, since the last dirty timestamp differs -"
                                    + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                            args);
                    return Response.status(Status.NOT_FOUND).build();
                } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
                    // 如果是集群同步请求,本地的时间,大于客户端传过来的时间,则返回 “冲突” 这个状态回去,以本地的时间大的为准
                    if (isReplication) {
                        logger.debug(
                                "Time to sync, since the last dirty timestamp differs -"
                                        + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                                args);
                        return Response.status(Status.CONFLICT).entity(appInfo).build();
                    } else {
                        return Response.ok().build();
                    }
                }
            }
     
        }
        return Response.ok().build();
    }
    
    //PeerAwareInstanceRegistryImpl.java
    public boolean renew(final String appName, final String id, final boolean isReplication) {
        // 执行续约操作
        if (super.renew(appName, id, isReplication)) {
            // 同步Eureka-Server集群
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
    
    
    //AbstractInstanceRegistry.java
    public boolean renew(String appName, String id, boolean isReplication) {
        // 增加续约次数到统计枚举
        RENEW.increment(isReplication);
        // 从Eureka-Server端本地的CurrentHashMap中,通过appName获取Lease信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        // lease为空,lease在第一次注册的时候会创建,为空,则表示从来没有注册过,租约不存在
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            // 获取lease里面的instance信息
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                // 一系列状态判断,目前还不是很清楚,但是不影响主流程
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatus(overriddenInstanceStatus);
                }
            }
            // 设置每分钟的续约次数
            renewsLastMin.increment();
            // 续约
            leaseToRenew.renew();
            return true;
        }
    }
    
    //Lease.java
    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
     
    }
    
    1. 首先根据实例名称获取续约信息对象lease,如果lease为null,表示之前没注册过,直接返回false
    2. lease不为null,更新lease的最后更新时间戳,更新lease的每分钟续约次数。
    3. 对lastDirtyTimestamp进行验证
      • lastDirtyTimestamp是客户端实例信息发生变化的时间,server收到实例变更时要比较当前时间不能小于lastDirtyTimestamp。

    返回顶部

    实例自动过期

    当客户端心跳超时的时候,server会有个定时任务对该类的实例进行下线

    protected void initEurekaServerContext() throws Exception {
       // ....省略N多代码
       // 服务刚刚启动的时候,去其他服务节点同步客户端的数量。
       int registryCount = this.registry.syncUp();
       // 这个方法里面计算expectedNumberOfRenewsPerMin的值 , 重点在这里面,这里启动了清理任务的定时器
       this.registry.openForTraffic(this.applicationInfoManager, registryCount);
     
       // Register all monitoring statistics.
       EurekaMonitors.registerAllStats();
    }
    
    
    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
         // ...省略N多代码
        // 开启定时清理过期客户端的定时器
        super.postInit();
    }
    
    
    protected void postInit() {
        renewsLastMin.start();//入口
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        // 设置定时器
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());//入口2
    }
    
    public synchronized void start() {
        if (!isActive) {
            timer.schedule(new TimerTask() {
    
                @Override
                public void run() {
                    try {
                        // 进行清0
                        lastBucket.set(currentBucket.getAndSet(0));
                    } catch (Throwable e) {
                        logger.error("Cannot reset the Measured Rate", e);
                    }
                }
            }, sampleInterval, sampleInterval);
    
            isActive = true;
        }
    }
    
    //EvictionTask 
    class EvictionTask extends TimerTask {
    
        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
    
        @Override
        public void run() {
            try {
                // 获取延迟秒数,就是延迟几秒下线
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
    }
    
    
    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
        // 判断是否开启自我保护机制
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
    
        
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        // 循环遍历本地CurrentHashMap中的实例信息
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    // 判断是否过期,此处为重点,里面有判断实例过期的依据
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }
    
        // 获取注册的实例数量
        int registrySize = (int) getLocalRegistrySize();
        // serverConfig.getRenewalPercentThreshold() 为0.85 , 主要是为了避免开启自动保护机制。 所以会逐步过期
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        // 可以过期的数量
        int evictionLimit = registrySize - registrySizeThreshold;
        // 取最小值,在过期数量和可以过期的数量中间取最小值。
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
            // 随机过期
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
    
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                // 写入过期监控
                EXPIRED.increment();
                // 服务下线
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }
    
    1. server用lastBucket维护了客户端总续约次数,开启个定时任务,没分钟把这个值清0一次,通过这种方式实现滑动窗口
    2. 在开启个定时任务来清理过期实例的,serverConfig.getEvictionIntervalTimerInMs() : 默认为60秒 , 可配置
      • 获取延迟下线
      • 自我保护判断
      • 获取map内的所有实例信息,循环进行判断是否过期
      • 获取总实例数量、计算出可过期数量(总数量*0.85,即自我保护阀值)、过期数量,在后两个中取个最小值,目的是防止触发自我保护机制。
      • 随机进行过期下线
        随机下线+分批下线:一个实例有10台服务器,本次需要下线4台,因为自我保护机制,一次不能下线超过百分85,所以本次只能下线2台,所以本次在这4台中随机下线两台,剩下的在下个周期在继续。

    返回顶部

    接受获取注册信息请求

    @GET
    public Response getContainers(@PathParam("version") String version,
                                  @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo,
                                  @Nullable @QueryParam("regions") String regionsStr) {
        
        // 获取注册列表的区域
        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
        String[] regions = null;
        if (!isRemoteRegionRequested) {
            EurekaMonitors.GET_ALL.increment();
        } else {
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
        }
    
        // 判断是否可以访问
        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
            return Response.status(Status.FORBIDDEN).build();
        }
        // 设置API版本
        CurrentRequestVersion.set(Version.toEnum(version));
        // 默认key的类型为JSON
        KeyType keyType = Key.KeyType.JSON;
        // 默认设置返回类型为JSON
        String returnMediaType = MediaType.APPLICATION_JSON;
        // 如果Accept为空,或者不包含JSON字符串(表示客户端可能不接收JSON类型),则设置返回XML类型的
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }
        // 构建缓存KEY 
        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );
        // 获取缓存信息,返回给客户端
        Response response;
        // 判断请求接收类型是否是gzip ,如果是,则返回gzip的流出去
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
            response = Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
            response = Response.ok(responseCache.get(cacheKey))
                    .build();
        }
        return response;
    }
    
    public Applications getApplications() {
        boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
        if (disableTransparentFallback) {
            return getApplicationsFromLocalRegionOnly();
        } else {
            return getApplicationsFromAllRemoteRegions();  // Behavior of falling back to remote region can be disabled.
        }
    }
    
    public Applications getApplicationsFromAllRemoteRegions() {
        return getApplicationsFromMultipleRegions(allKnownRemoteRegions);
    }
    public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
    
        boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
    
        logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
                includeRemoteRegion, Arrays.toString(remoteRegions));
        // 默认为false
        if (includeRemoteRegion) {
            GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
        } else {
            GET_ALL_CACHE_MISS.increment();
        }
        Applications apps = new Applications();
        apps.setVersion(1L);
        // 循环该类中的CurrentHashMap, 这个MAP中,存储的是所有的客户端注册的实例信息
        // KEY 为客户端的名称,value为客户端的集群机器信息。
        for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
            Application app = null;
            // 
            if (entry.getValue() != null) {
                // 获取Lease信息,里面有每个实例的instance信息,分装成Application实体
                for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
                    Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                    if (app == null) {
                        app = new Application(lease.getHolder().getAppName());
                    }
                    app.addInstance(decorateInstanceInfo(lease));
                }
            }
            if (app != null) {
                //放入 Applications里面去
                apps.addApplication(app);
            }
        }
       // 。。。。省略N多代码
        apps.setAppsHashCode(apps.getReconcileHashCode());
        return apps;
    }
    
    1. 把服务端本地的CurrentHashMap里面存储的客户端信息,封装成Application实体,然后返回
    @Path("delta")
    @GET
    public Response getContainerDifferential(
            @PathParam("version") String version,
            @HeaderParam(HEADER_ACCEPT) String acceptHeader,
            @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
            @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
            @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
        // ..... 省略N多代码
        Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS_DELTA,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );
        // ..... 省略N多代码
        if (acceptEncoding != null
            && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
         return Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
        } else {
            return Response.ok(responseCache.get(cacheKey))
                .build();
        }
    }
    
    public Applications getApplicationDeltas() {
        GET_ALL_CACHE_MISS_DELTA.increment();
        // 最近变化过的应用,初始化一个实体
        Applications apps = new Applications();
        // 增量获取的版本号
        apps.setVersion(responseCache.getVersionDelta().get());
        Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
        try {
            // 上写锁
            write.lock();
            // 最近产生过变化的客户端,都在这个队列里面
            Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
            logger.debug("The number of elements in the delta queue is :"
                    + this.recentlyChangedQueue.size());
            // 循环队列
            while (iter.hasNext()) {
                // 获取队列中的lease信息,这里面封装的就是客户端的实例信息
                Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                InstanceInfo instanceInfo = lease.getHolder();
                Object[] args = {instanceInfo.getId(),
                        instanceInfo.getStatus().name(),
                        instanceInfo.getActionType().name()};
                logger.debug(
                        "The instance id %s is found with status %s and actiontype %s",
                        args);
                Application app = applicationInstancesMap.get(instanceInfo
                        .getAppName());
                if (app == null) {
                    // 组装成一个Application实体,同时放入Applications里面去
                    app = new Application(instanceInfo.getAppName());
                    applicationInstancesMap.put(instanceInfo.getAppName(), app);
                    apps.addApplication(app);
                }
                app.addInstance(decorateInstanceInfo(lease));
            }
    
            boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
            // 暂时没看明白这里的作用(苦笑。。)
            if (!disableTransparentFallback) {
                Applications allAppsInLocalRegion = getApplications(false);
    
                for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
                    Applications applications = remoteRegistry.getApplicationDeltas();
                    for (Application application : applications.getRegisteredApplications()) {
                        Application appInLocalRegistry =
                                allAppsInLocalRegion.getRegisteredApplications(application.getName());
                        if (appInLocalRegistry == null) {
                            apps.addApplication(application);
                        }
                    }
                }
            }
            // 获取全量的注册信息
            Applications allApps = getApplications(!disableTransparentFallback);
            // 设置HashCode 
            apps.setAppsHashCode(allApps.getReconcileHashCode());
            return apps;
        } finally {
            write.unlock();
        }
    }
    
    private TimerTask getDeltaRetentionTask() {
        return new TimerTask() {
    
            @Override
            public void run() {
                Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
                while (it.hasNext()) {
                    // 最后更新时间小于当前时间-3分钟,那么就会被移除
                    if (it.next().getLastUpdateTime() <
                            System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                        it.remove();
                    } else {
                        break;
                    }
                }
            }
    
        };
    }
    
    1. 上面主要用到了一个租约变化的队列, 这里面在客户端发生变化时,都会在这里面加入一条信息, 如: 注册,下线,过期等操作,
      租约变化队列里面的数据默认保存3分钟,会有一个定时器没30秒清理一次。
    2. retentionTimeInMSInDeltaQueue : 客户端保持增量信息缓存的时间,从而保证不会丢失这些信息,单位为毫秒,默认为3 * 60 * 1000
      获取到了这些变化的客户端信息,返回Eureka Clien 之后,通过集合合并,就可以得到最新的缓存数据了。

    返回顶部

  • 相关阅读:
    HashMap和Hashtable及HashSet的区别
    Android获取系统的时间
    Android的布局属性
    ListView 在代码里设置margin
    如何用Vue自己实现一个message提示插件
    JS获取最近三个月日期范围
    css实现表单label文字两端对齐
    my utils
    Vue 路由&组件懒加载(按需加载)
    C# 通过window消息控制指定控件的scroll滚动
  • 原文地址:https://www.cnblogs.com/yanhui007/p/12595691.html
Copyright © 2011-2022 走看看