zoukankan      html  css  js  c++  java
  • Spring cloud Eureka源码篇(八)

    一、Eureka的一些概念

    在Eureka的服务治理中,会涉及到下面一些概念:
    服务注册:Eureka Client会通过发送REST请求的方式向Eureka Server注册自己的服务,提供自身的元数据,比如ip地址、端口、运行状况指标的url、主页地址等信息。Eureka Server接收到注册请求后,就会把这些元数据信息存储在一个双层的Map中。
    服务续约:在服务注册后,Eureka Client会维护一个心跳来持续通知Eureka Server,说明服务一直处于可用状态,防止被剔除。Eureka Client在默认的情况下会每隔30秒发送一次心跳来进行服务续约。
    服务同步:Eureka Server之间会互相进行注册,构建Eureka Server集群,不同Eureka Server之间会进行服务同步,用来保证服务信息的一致性。
    获取服务:服务消费者(Eureka Client)在启动的时候,会发送一个REST请求给Eureka Server,获取上面注册的服务清单,并且缓存在Eureka Client本地,默认缓存30秒。同时,为了性能考虑,Eureka Server也会维护一份只读的服务清单缓存,该缓存每隔30秒更新一次。
    服务调用:服务消费者在获取到服务清单后,就可以根据清单中的服务列表信息,查找到其他服务的地址,从而进行远程调用。Eureka有Region和Zone的概念,一个Region可以包含多个Zone,在进行服务调用时,优先访问处于同一个Zone中的服务提供者。
    服务下线:当Eureka Client需要关闭或重启时,就不希望在这个时间段内再有请求进来,所以,就需要提前先发送REST请求给Eureka Server,告诉Eureka Server自己要下线了,Eureka Server在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去。
    服务剔除:有时候,服务实例可能会因为网络故障等原因导致不能提供服务,而此时该实例也没有发送请求给Eureka Server来进行服务下线,所以,还需要有服务剔除的机制。Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。
    自我保护:既然Eureka Server会定时剔除超时没有续约的服务,那就有可能出现一种场景,网络一段时间内发生了异常,所有的服务都没能够进行续约,Eureka Server就把所有的服务都剔除了,这样显然不太合理。所以,就有了自我保护机制,当短时间内,统计续约失败的比例,如果达到一定阈值,则会触发自我保护的机制,在该机制下,Eureka Server不会剔除任何的微服务,等到正常后,再退出自我保护机制。

    从这些概念中,就可以知道大体的流程,Eureka Client向Eureka Server注册,并且维护心跳来进行续约,如果长时间不续约,就会被剔除。Eureka Server之间进行数据同步来形成集群,Eureka Client从Eureka Server获取服务列表,用来进行服务调用,Eureka Client服务重启前调用Eureka Server的接口进行下线操作。

    二、服务注册的入口

    在准备写这一篇时看了很多大佬写的文章,都有一个共同点,都是从启动类注解@EnableDiscoveryClient开始找入口的,但是有个问题就是,在我上篇的文章中我并没有用到这个注解也完成了服务的注册,这是怎么做到的呢,如果想了解原因就要从springboot说起了,服务注册是在spring boot应用启动的时候发起的。具体的执行路径暂且不看,spring cloud是一个生态,它提供了一套标准,这套标准可以通过不同的组件来实现,其中就包含服务注册/发现、熔断、负载均衡等,在spring-cloud-commons这个包中,org.springframework.cloud.client.serviceregistry 路径下,可以看到一个服务注册的接口定义 ServiceRegistry 。它就是定义了spring cloud中服务注册的一个接口。我们看一下它的类关系图,这个接口有一个唯一的实现 EurekaServiceRegistry 。表示采用的是Eureka Server作为服务注册中心。

     三、服务注册的触发路径

             有了上面的概念下面就来看下springboot是如何调用EurekaServiceRegistry 的,而EurekaServiceRegistry 在被调用时又做了啥,大家自要想想其实应该不难猜测到,服务的注册取决于服务是否已经启动好了。而在spring boot中,会等到spring 容器启动并且所有的配置都完成之后来进行注册。而这个动作在spring boot的启动方法中的refreshContext中完成。

    点击run

     进入SpringApplication的ConfigurableApplicationContext方法

     点击run进入下图

     再点击run进入SpringApplication的run方法

    public ConfigurableApplicationContext run(String... args) {
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            ConfigurableApplicationContext context = null;
            Collection<SpringBootExceptionReporter> exceptionReporters = new ArrayList<>();
            configureHeadlessProperty();
            SpringApplicationRunListeners listeners = getRunListeners(args);
            listeners.starting();
            try {
                ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
                ConfigurableEnvironment environment = prepareEnvironment(listeners, applicationArguments);
                configureIgnoreBeanInfo(environment);
                Banner printedBanner = printBanner(environment);
                context = createApplicationContext();
                exceptionReporters = getSpringFactoriesInstances(SpringBootExceptionReporter.class,
                        new Class[] { ConfigurableApplicationContext.class }, context);
                prepareContext(context, environment, listeners, applicationArguments, printedBanner);
    //刷新上下文的方法 refreshContext(context); afterRefresh(context, applicationArguments); stopWatch.stop();
    if (this.logStartupInfo) { new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), stopWatch); } listeners.started(context); callRunners(context, applicationArguments); } catch (Throwable ex) { handleRunFailure(context, ex, exceptionReporters, listeners); throw new IllegalStateException(ex); } try { listeners.running(context); } catch (Throwable ex) { handleRunFailure(context, ex, exceptionReporters, null); throw new IllegalStateException(ex); } return context; }

    点击refreshContext(context);看下刷新做了啥子事

        private void refreshContext(ConfigurableApplicationContext context) {
            refresh((ApplicationContext) context);
            if (this.registerShutdownHook) {
                try {
                    context.registerShutdownHook();
                }
                catch (AccessControlException ex) {
                    // Not allowed in some environments.
                }
            }
        }

    点击refresh()一直往里面走,至到下图位置,选择第一个

     进入AbstractApplicationContext的refresh()方法try里面有很多调用的前置方法,里面是spring的东西,在本篇中讲了没什么意义,下面就看跟本篇有关的一个方法finishRefresh();

        @Override
        public void refresh() throws BeansException, IllegalStateException {
            synchronized (this.startupShutdownMonitor) {
                // Prepare this context for refreshing.
                prepareRefresh();
    
                // Tell the subclass to refresh the internal bean factory.
                ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
    
                // Prepare the bean factory for use in this context.
                prepareBeanFactory(beanFactory);
    
                try {
                    // Allows post-processing of the bean factory in context subclasses.
                    postProcessBeanFactory(beanFactory);
    
                    // Invoke factory processors registered as beans in the context.
                    invokeBeanFactoryPostProcessors(beanFactory);
    
                    // Register bean processors that intercept bean creation.
                    registerBeanPostProcessors(beanFactory);
    
                    // Initialize message source for this context.
                    initMessageSource();
    
                    // Initialize event multicaster for this context.
                    initApplicationEventMulticaster();
    
                    // Initialize other special beans in specific context subclasses.
                    onRefresh();
    
                    // Check for listener beans and register them.
                    registerListeners();
    
                    // Instantiate all remaining (non-lazy-init) singletons.
                    finishBeanFactoryInitialization(beanFactory);
    
                    // Last step: publish corresponding event.
                    finishRefresh();
                }
    
                catch (BeansException ex) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Exception encountered during context initialization - " +
                                "cancelling refresh attempt: " + ex);
                    }
    
                    // Destroy already created singletons to avoid dangling resources.
                    destroyBeans();
    
                    // Reset 'active' flag.
                    cancelRefresh(ex);
    
                    // Propagate exception to caller.
                    throw ex;
                }
    
                finally {
                    // Reset common introspection caches in Spring's core, since we
                    // might not ever need metadata for singleton beans anymore...
                    resetCommonCaches();
                }
            }
        }

    在finishRefresh();从名字上可以看到它是用来体现完成刷新的操作,也就是刷新完成之后要做的后置的操作。它主要做几个事情

    • 清空缓存
    • 初始化一个LifecycleProcessor,在Spring启动的时候启动bean,在spring结束的时候销毁bean
    • 调用LifecycleProcessor的onRefresh方法,启动实现了Lifecycle接口的bean
    • 发布ContextRefreshedEvent
    • 注册MBean,通过JMX进行监控和管理

    在这个方法中有两个核心东西,一个是 initLifecycleProcessor();另一个是getLifecycleProcessor().onRefresh();其中getLifecycleProcessor().onRefresh()它是调用生命周期处理器的onrefresh方法,找到SmartLifecycle接口的所有实现类并调用start方法。

        protected void finishRefresh() {
            // Clear context-level resource caches (such as ASM metadata from scanning).
            clearResourceCaches();
            //初始化生命周期的处理器
            // Initialize lifecycle processor for this context.
            initLifecycleProcessor();
    
            // Propagate refresh to lifecycle processor first.
            getLifecycleProcessor().onRefresh();
    
            // Publish the final event.
            publishEvent(new ContextRefreshedEvent(this));
    
            // Participate in LiveBeansView MBean, if active.
            LiveBeansView.registerApplicationContext(this);
        }

    点击上图中的onRefresh()进入DefaultLifecycleProcessor类的onRefresh()方法

        @Override
        public void onRefresh() {
            startBeans(true);
            this.running = true;
        }

    点击startBeans看下这个启动Bean的方法做了啥,如果在下面Bebugger的话会发现lifecycleBeans 的map集合中有我们比较关心的eurekaAutoServiceRegistration,这个方法其实很简单,就是拿到beans的集合然后循环去调用start方法

        private void startBeans(boolean autoStartupOnly) {
            Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
            Map<Integer, LifecycleGroup> phases = new HashMap<>();
            lifecycleBeans.forEach((beanName, bean) -> {
                if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
                    int phase = getPhase(bean);
                    LifecycleGroup group = phases.get(phase);
                    if (group == null) {
                        group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
                        phases.put(phase, group);
                    }
                    group.add(beanName, bean);
                }
            });
            if (!phases.isEmpty()) {
                List<Integer> keys = new ArrayList<>(phases.keySet());
                Collections.sort(keys);
                for (Integer key : keys) {
    // phases.
    get(key).start(); } } }

    点击上面的start()方法

            public void start() {
                if (this.members.isEmpty()) {
                    return;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Starting beans in phase " + this.phase);
                }
                Collections.sort(this.members);
                for (LifecycleGroupMember member : this.members) {
                    doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
                }
            }

    点击上面的doStart()方法,最后调用bean.start();此时bean.start();调用的Bean的实例应该是EurekaAutoServiceRegistration

    private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
            Lifecycle bean = lifecycleBeans.remove(beanName);
            if (bean != null && bean != this) {
                String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
                for (String dependency : dependenciesForBean) {
                    doStart(lifecycleBeans, dependency, autoStartupOnly);
                }
                if (!bean.isRunning() &&
                        (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
                    }
                    try {
    //最后的调用 bean.start(); }
    catch (Throwable ex) { throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex); } if (logger.isDebugEnabled()) { logger.debug("Successfully started bean '" + beanName + "'"); } } } }

    然后debugger一下发现调用的也和我们猜想的一样是EurekaAutoServiceRegistration

    四、SmartLifeCycle的演示

    在接着向下说之前,要先弄懂SmartLifeCycle这块的知识, SmartLifeCycle是一个接口,当Spring容器加载完所有的Bean并且初始化之后,会继续回调实现了SmartLifeCycle接口的类中对应的方法,比如(start)。实际上我们自己也可以拓展,比如在springboot工程的main方法同级目录下,写一个测试类,实现SmartLifeCycle接口,并且通过 @Service 声明为一个bean,因为要被spring去加载,首先得是bean。

    @Service
    public class SmartLifecycleDemo implements SmartLifecycle {
        public void start() {
            System.out.println("start");
        }
    
        public void stop() {
            System.out.println("stop");
        }
    
        public boolean isRunning() {
            System.out.println("isRunning");
            return false;
        }
    }

    接着,我们启动spring boot应用后,可以看到控制台输出了 start 字符串。有了上面的概念后接着向下走,进入EurekaAutoServiceRegistration类中,会发现这个类实现了SmartLifecycle,这个好办了,二话不说直接看他的start()方法;

        @Override
        public void start() {
            // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
            if (this.port.get() != 0) {
                if (this.registration.getNonSecurePort() == 0) {
                    this.registration.setNonSecurePort(this.port.get());
                }
    
                if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
                    this.registration.setSecurePort(this.port.get());
                }
            }
    
            // only initialize if nonSecurePort is greater than 0 and it isn't already running
            // because of containerPortInitializer below
            if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
    
                this.serviceRegistry.register(this.registration);
    
                this.context.publishEvent(new InstanceRegisteredEvent<>(this,
                        this.registration.getInstanceConfig()));
                this.running.set(true);
            }
        }

    在start方法中,我们可以看到 this.serviceRegistry.register 这个方法,它实际上就是发起服务注册的机制。此时this.serviceRegistry的实例,应该是 EurekaServiceRegistry ,这就回到了开篇说的EurekaServiceRegistry 是怎么触发调用的了, 原因是EurekaAutoServiceRegistration的构造方法中,会有一个赋值操作,而这个构造方法是在EurekaClientAutoConfiguration 这个自动装配类中被装配和初始化的,代码如下。

     五、服务的注册

    接下来跟着上面的代码分析服务注册的流程,EurekaAutoServiceRegistration.start方法;this.serviceRegistry.register(this.registration); 方法最终会调用EurekaServiceRegistry 类中的 register 方法来实现服务注册。

    @Override
        public void start() {
            // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
            if (this.port.get() != 0) {
                if (this.registration.getNonSecurePort() == 0) {
                    this.registration.setNonSecurePort(this.port.get());
                }
    
                if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
                    this.registration.setSecurePort(this.port.get());
                }
            }
    
            // only initialize if nonSecurePort is greater than 0 and it isn't already running
            // because of containerPortInitializer below
            if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
    
                this.serviceRegistry.register(this.registration);
    
                this.context.publishEvent(new InstanceRegisteredEvent<>(this,
                        this.registration.getInstanceConfig()));
                this.running.set(true);
            }
        }

    进入EurekaServiceRegistry 类中的 register 方法

    @Override
        public void register(EurekaRegistration reg) {
            maybeInitializeClient(reg);
    
            if (log.isInfoEnabled()) {
                log.info("Registering application "
                        + reg.getApplicationInfoManager().getInfo().getAppName()
                        + " with eureka with status "
                        + reg.getInstanceConfig().getInitialStatus());
            }
            //设置当前实例的状态,一旦这个实例的状态发生变化,只要状态不是DOWN,那么就会被监听器监听并且执行服务注册
            reg.getApplicationInfoManager()
                    .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
            //设置健康检查的处理
            reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
                    .getEurekaClient().registerHealthCheck(healthCheckHandler));
        }

     从上述代码来看,注册方法中并没有真正调用Eureka的方法去执行注册,而是仅仅设置了一个状态以及设置健康检查处理器。继续看一下reg.getApplicationInfoManager().setInstanceStatus方法。

        public synchronized void setInstanceStatus(InstanceStatus status) {
    //缓存服务的状态,应该是存在内存中的东西,有兴趣的可以点击instanceStarusMapper进去看下 InstanceStatus next
    = instanceStatusMapper.map(status); if (next == null) { return; } InstanceStatus prev = instanceInfo.setStatus(next); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, next)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } }

    在这个方法中,它会通过监听器来发布一个状态变更事件。此时listener的实例是StatusChangeListener ,也就是调用 StatusChangeListener 的notify方法。这个事件是触发一个服务状态变更,应该是有地方会监听这个事件,然后基于这个事件。这个时候我们以为找到了方向,然后点击进去一看,卞击,发现它是一个接口。而且我们发现它是静态的内部接口,还无法直接看到它的实现类。于是又往回找,因为不用想它一定是在某个地方做了初始化的工作,于是,我想找到EurekaServiceRegistry.register方法中的是什么,而且我们发现ApplicationInfoManager是来自于EurekaRegistration这个类中的属性。而reg.getApplicationInfoManager 这个实例EurekaRegistration又是在EurekaAutoServiceRegistration这个类中实例化的。那么猜想,是不是在自动装配中做了什么东西。于是找到EurekaClientAutoConfiguration这个类,果然看到了Bean的一些自动装配,其中包含 EurekaClient 、 ApplicationInfoMangager 、 EurekaRegistration 等。其中还有一个重要的类是EurekaClientConfiguration,通过名字就知道这个跟EurekaClient一定是有关系的,前面说的是跟Eureka的注册有关系,这个Client应该是跟通信有关系的,在上一篇中也说过通讯是跟client有关系的

        @Configuration(proxyBeanMethods = false)
        @ConditionalOnMissingRefreshScope
        protected static class EurekaClientConfiguration {
    
            @Autowired
            private ApplicationContext context;
    
            @Autowired
            private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
    
            @Bean(destroyMethod = "shutdown")
            @ConditionalOnMissingBean(value = EurekaClient.class,
                    search = SearchStrategy.CURRENT)
            public EurekaClient eurekaClient(ApplicationInfoManager manager,
                    EurekaClientConfig config) {
                return new CloudEurekaClient(manager, config, this.optionalArgs,
                        this.context);
            }
    
            @Bean
            @ConditionalOnMissingBean(value = ApplicationInfoManager.class,
                    search = SearchStrategy.CURRENT)
            public ApplicationInfoManager eurekaApplicationInfoManager(
                    EurekaInstanceConfig config) {
                InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
                return new ApplicationInfoManager(config, instanceInfo);
            }
    
            @Bean
            @ConditionalOnBean(AutoServiceRegistrationProperties.class)
            @ConditionalOnProperty(
                    value = "spring.cloud.service-registry.auto-registration.enabled",
                    matchIfMissing = true)
            public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
                    CloudEurekaInstanceConfig instanceConfig,
                    ApplicationInfoManager applicationInfoManager, @Autowired(
                            required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
                return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
                        .with(eurekaClient).with(healthCheckHandler).build();
            }
    
        }

    通过上面代码看到了一个很重要的Bean在启动的时候做了自动装配,也就是CloudEurekaClient 。从名字上来看,很容易的识别并猜测出它是Eureka客户端的一个工具类,用来实现和服务端的通信以及处理。这个很多源码一贯的套路,要么在构造方法里面去做很多的初始化和一些后台执行的程序操作,要么就是通过异步事件的方式来处理。接着,看一下CloudEurekaClient的初始化过程,它的构造方法中会通过 super 调用父类的构造方法。也就是DiscoveryClient的构造。(super(applicationInfoManager, config, args);调用父类的构造方法,而CloudEurekaClient的父类是DiscoveryClient.),这转了一圈就转到了网上很多大佬写的注解进入DiscoveryClient类中来了

        public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
                EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
                ApplicationEventPublisher publisher) {
            super(applicationInfoManager, config, args);
            this.applicationInfoManager = applicationInfoManager;
            this.publisher = publisher;
            this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
                    "eurekaTransport");
            ReflectionUtils.makeAccessible(this.eurekaTransportField);
        }

    进入DiscoveryClient类,可以看到在最终的DiscoveryClient改造方法中,有非常长的代码。其实很多代码可以不需要关心,大部分都是一些初始化工作,比如初始化了几个定时任务

    • scheduler
    • heartbeatExecutor 心跳定时任务
    • cacheRefreshExecutor 定时去同步服务端的实例列表
        @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
            if (args != null) {
                this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
                this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
                this.eventListeners.addAll(args.getEventListeners());
                this.preRegistrationHandler = args.preRegistrationHandler;
            } else {
                this.healthCheckCallbackProvider = null;
                this.healthCheckHandlerProvider = null;
                this.preRegistrationHandler = null;
            }
            
            this.applicationInfoManager = applicationInfoManager;
            InstanceInfo myInfo = applicationInfoManager.getInfo();
    
            clientConfig = config;
            staticClientConfig = clientConfig;
            transportConfig = config.getTransportConfig();
            instanceInfo = myInfo;
            if (myInfo != null) {
                appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
            } else {
                logger.warn("Setting instanceInfo to a passed in null value");
            }
    
            this.backupRegistryProvider = backupRegistryProvider;
            this.endpointRandomizer = endpointRandomizer;
            this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
            localRegionApps.set(new Applications());
    
            fetchRegistryGeneration = new AtomicLong(0);
    
            remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
            remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
            //是否要从eureka server上获取服务地址信息
            if (config.shouldFetchRegistry()) {
                this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
            //是否要注册到eureka server上
            if (config.shouldRegisterWithEureka()) {
                this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
            //如果不需要注册并且不需要更新服务地址
            if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
                logger.info("Client configured to neither register nor query for data.");
                scheduler = null;
                heartbeatExecutor = null;
                cacheRefreshExecutor = null;
                eurekaTransport = null;
                instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
                // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
                // to work with DI'd DiscoveryClient
                DiscoveryManager.getInstance().setDiscoveryClient(this);
                DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
                initTimestampMs = System.currentTimeMillis();
                logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                        initTimestampMs, this.getApplications().size());
    
                return;  // no need to setup up an network tasks and we are done
            }
    
            try {
                // default size of 2 - 1 each for heartbeat and cacheRefresh
    //定时任务,用线程池解决 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); //心跳 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //缓存刷新,前面说过服务提供者在注册时拉取列表时要将服务缓存到本地,是用来刷新本地缓存 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } //如果需要注册到Eureka server并且是开启了初始化的时候强制注册,则调用register()发起服务注册 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    //初始化定时任务 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }

     进入DiscoveryClient.initScheduledTasks;initScheduledTasks 去启动一个定时任务。

    • 如果配置了开启从注册中心刷新服务列表,则会开启cacheRefreshExecutor这个定时任务
    • 如果开启了服务注册到Eureka,则通过需要做几个事情.
      • 建立心跳检测机制
      • 通过内部类来实例化StatusChangeListener 实例状态监控接口,这个就是前面 在分析启动过程中所看到的,调用notify的方法,实际上会在这里体现。
        private void initScheduledTasks() {
    //如果配置了开启从注册中心刷新服务列表,则会开启cacheRefreshExecutor这个定时任务
    if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } //如果开启了服务注册到Eureka,则通过需要做几个事情 if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator
    //初始化一个:instanceInfoReplicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; //注册实例状态变化的监听 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //启动一个实例信息复制器,主要就是为了开启一个定时线程,每40秒判断实例信息是否变更,如果变更了则重新注册 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }

    进入instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());方法看start做了什么事,可以发现用了cas保证了线程的安全性,并在里面启动了一个任务,

        public void start(int initialDelayMs) {
            if (started.compareAndSet(false, true)) {
    //设置实例信息 instanceInfo.setIsDirty();
    // for initial register Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }

     上图中有一段代码是Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);他这是将当前对象实例传递过去,我们看scheduler是一个线程池,那么他必然用到了一个线程

     所以在当前类中直接找run()方法,run方法实际上和前面自动装配所执行的服务注册方法是一样的,也就是调用 register 方法进行服务注册,并且在finally中,每30s会定时执行一下当前的run 方法进行检查。

        public void run() {
            try {
                discoveryClient.refreshInstanceInfo();
    
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
    //这个是最终发起注册的方法,跟通信有关 discoveryClient.register();
    //释放 instanceInfo.unsetIsDirty(dirtyTimestamp); } }
    catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }

    DiscoveryClient中的register方法进去看下;最终,我们终于找到服务注册的入口了,也就是网上大多数文章开篇就说的方法; eurekaTransport.registrationClient.register 最终调用的是 AbstractJerseyEurekaHttpClient#register(...)`, 如果有兴趣自己去看代码,就会发现去调用之前有很多绕来绕去的代码,比如工厂模式、装饰器模式等。

      boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
        }

    进入他的像类AbstractJerseyEurekaHttpClient

     在AbstractJerseyEurekaHttpClient的register方法中很显然的发现这里是发起了一次http请求,访问Eureka-Server的apps/${APP_NAME}接口,将当前服务实例的信息发送到Eureka Server进行保存。至此,基本上已经知道Spring Cloud Eureka 是如何在启动的时候把服务信息注册到Eureka Server上的了

       @Override
        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName();
            ClientResponse response = null;
            try {
                Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
                addExtraHeaders(resourceBuilder);
                response = resourceBuilder
                        .header("Accept-Encoding", "gzip")
                        .type(MediaType.APPLICATION_JSON_TYPE)
                        .accept(MediaType.APPLICATION_JSON)
                        .post(ClientResponse.class, info);
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }

     至此,已经知道Eureka Client发起服务注册时,有两个地方会执行服务注册的任务

    • 在Spring Boot启动时,由于自动装配机制将CloudEurekaClient注入到了容器,并且执行了构造方法,而在构造方法中有一个定时任务每40s会执行一次判断,判断实例信息是否发生了变化,如果是则会发起服务注册的流程
    • 在Spring Boot启动时,通过refresh方法,最终调用StatusChangeListener.notify进行服务状态变更的监听,而这个监听的方法受到事件之后会去执行服务注册。

    六、Eureka Server收到请求之后的处理

    前面说到了服务将当前服务实例的信息发送到Eureka Server进行保存那么接下来就看Server是怎么接收请求并且存储的,请求入口在:  com.netflix.eureka.resources.ApplicationResource.addInstance() 。这里所提供的REST服务,采用的是jersey来实现的。Jersey是基于JAX-RS标准,提供REST的实现的支持。

    进入ApplicationResource.addInstance()方法,当EurekaClient调用register方法发起注册时,会调用ApplicationResource.addInstance方法。服务注册就是发送一个 POST 请求带上当前实例信息到类 ApplicationResource 的 addInstance方法进行服务注册

     @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info,
                                    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
            logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
            // validate that the instanceinfo contains all the necessary required fields
            if (isBlank(info.getId())) {
                return Response.status(400).entity("Missing instanceId").build();
            } else if (isBlank(info.getHostName())) {
                return Response.status(400).entity("Missing hostname").build();
            } else if (isBlank(info.getIPAddr())) {
                return Response.status(400).entity("Missing ip address").build();
            } else if (isBlank(info.getAppName())) {
                return Response.status(400).entity("Missing appName").build();
            } else if (!appName.equals(info.getAppName())) {
                return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
            } else if (info.getDataCenterInfo() == null) {
                return Response.status(400).entity("Missing dataCenterInfo").build();
            } else if (info.getDataCenterInfo().getName() == null) {
                return Response.status(400).entity("Missing dataCenterInfo Name").build();
            }
    
            // handle cases where clients may be registering with bad DataCenterInfo with missing data
            DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
            if (dataCenterInfo instanceof UniqueIdentifier) {
                String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
                if (isBlank(dataCenterInfoId)) {
                    boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                    if (experimental) {
                        String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                        return Response.status(400).entity(entity).build();
                    } else if (dataCenterInfo instanceof AmazonInfo) {
                        AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                        String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                        if (effectiveId == null) {
                            amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                        }
                    } else {
                        logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                    }
                }
            }
    
            registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();  // 204 to be backwards compatible
        }

     本来想debugger看看上图中if的判断,结果发现所有if判断都没进去直接进了registry.register(info, "true".equals(isReplication));方法,那进入PeerAwareInstanceRegistryImpl.register方法中

     

     先来看PeerAwareInstanceRegistryImpl的类关系图,从类关系图可以看出,PeerAwareInstanceRegistry的最顶层接口为LeaseManager与LookupService

    • 其中LookupService定义了最基本的发现示例的行为
    • LeaseManager定义了处理客户端注册,续约,注销等操作

    在 addInstance 方法中,最终调用的是 PeerAwareInstanceRegistryImpl.register 方法。

    • leaseDuration 表示租约过期时间,默认是90s,也就是当服务端超过90s没有收到客户端的心跳,则主动剔除该节点
    • 调用super.register发起节点注册
    • replicateToPeers将信息复制到Eureka Server集群中的其他机器上,同步的实现也很简单,就是获得集群中的所有节点,然后逐个发起注册
    public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
    //如果客户端有自己定义心跳超时时间,则采用客户端的时间
    leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    //复制到Eureka Server集群中的其他节点
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

     点击super.register看他是怎么发起节点注册的,进入AbstractInstanceRegistry类中register方法;可以发现Eureka-Server的服务注册,实际上是将客户端传递过来的实例数据保存到Eureka-Server中的ConcurrentHashMap中。

      public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            try {
                read.lock();
    //从registry中获得当前实例信息,根据appName Map
    <String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
    //增加注册次数到监控中 REGISTER.increment(isReplication);
    //如果当前appName是第一次注册,则初始化一个concurrentHashMap
    if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } }
    //从gMap中查询已经存在的Lease信息,Lease是租约,实际上它把服务提供者的实例信息包装成一个lease。里面提供了对于改服务实例的租约管理 Lease
    <InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease
    //当instance已经存在时,和客户端的instance的信息做比较,时间最新的那个,为有效instance信息 if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration
    //当lease不存在时,进入到这段代码 synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); }
    //构建一个lease Lease
    <InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) {
    //当原来存在lease的信息时,设置serviceUpTimestamp,保证服务启动的时间一直是第一次注册的那个 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease);
    //添加到最新的注册队列中 recentRegisteredQueue.add(
    new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); // This is where the initial state transfer of overridden status happens
    //检查实例状态是否发生变化,如果是并且存在,则覆盖原来的状态 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp
    //得到instanceStatus,判断是否是UP状态 if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); }
    //设置注册类型为添加 registrant.setActionType(ActionType.ADDED);
    //租约变更记录队列,记录了实例的每次变化,用于注册信息的增量获取 recentlyChangedQueue.add(
    new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp();
    //让缓存失效 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info(
    "Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }

    至此,服务注册在客户端和服务端的处理过程做了一个详细的分析,实际上在Eureka Server端,会把客户端的地址信息保存到ConcurrentHashMap中存储。并且服务提供者和注册中心之间,会建立一个心跳检测机制。用于监控服务提供者的健康状态。

    七、Eureka 的三级缓存设计

    Eureka Server存在三个变量:(registry、readWriteCacheMap、readOnlyCacheMap)保存服务注册信息,默认情况下定时任务每30s将readWriteCacheMap同步至readOnlyCacheMap,每60s清理超过90s未续约的节点,Eureka Client每30s从readOnlyCacheMap更新服务注册信息,而客户端服务的注册则从registry更新服务注册信息。

    7.1、多级缓存的意义

    这里为什么要设计多级缓存呢?原因很简单,就是当存在大规模的服务注册和更新时,如果只是修改一个ConcurrentHashMap数据,那么势必因为锁的存在导致竞争,影响性能。而Eureka又是AP模型,只需要满足最终可用就行。所以它在这里用到多级缓存来实现读写分离。注册方法写的时候直接写内存注册表,写完表之后主动失效读写缓存。获取注册信息接口先从只读缓存取,只读缓存没有再去读写缓存取,读写缓存没有再去内存注册表里取(不只是取,此处较复杂)。并且,读写缓存会更新回写只读缓存

    • responseCacheUpdateIntervalMs : readOnlyCacheMap 缓存更新的定时器时间间隔,默认为30秒
    • responseCacheAutoExpirationInSeconds : readWriteCacheMap 缓存过期时间,默认为 180 秒

    7.2、服务注册的缓存失效

    在AbstractInstanceRegistry.register方法的最后,会调用invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),registrant.getSecureVipAddress()); 方法,使得读写缓存失效。
    点击invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());进入以下方法

        private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
            // invalidate cache
            responseCache.invalidate(appName, vipAddress, secureVipAddress);
        }

    点击invalidate进入ResponseCacheImpl类中的invalidate方法

        @Override
        public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
            for (Key.KeyType type : Key.KeyType.values()) {
                for (Version v : Version.values()) {
                    invalidate(
                            new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
                            new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
                            new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
                            new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
                            new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
                            new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
                    );
                    if (null != vipAddress) {
                        invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
                    }
                    if (null != secureVipAddress) {
                        invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
                    }
                }
            }
        }

    点击invalidate进入下面

     public void invalidate(Key... keys) {
            for (Key key : keys) {
                logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                        key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
    
                readWriteCacheMap.invalidate(key);
                Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
                if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
                    for (Key keysWithRegion : keysWithRegions) {
                        logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                                key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                        readWriteCacheMap.invalidate(keysWithRegion);
                    }
                }
            }
        }

       7.3、定时同步缓存

    ResponseCacheImpl的构造方法中,会启动一个定时任务,这个任务会定时检查写缓存中的数据变化,进行更新和同步。

        private TimerTask getCacheUpdateTask() {
            return new TimerTask() {
                @Override
                public void run() {
                    logger.debug("Updating the client cache from response cache");
                    for (Key key : readOnlyCacheMap.keySet()) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                    key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                        }
                        try {
                            CurrentRequestVersion.set(key.getVersion());
                            Value cacheValue = readWriteCacheMap.get(key);
                            Value currentCacheValue = readOnlyCacheMap.get(key);
                            if (cacheValue != currentCacheValue) {
                                readOnlyCacheMap.put(key, cacheValue);
                            }
                        } catch (Throwable th) {
                            logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                        } finally {
                            CurrentRequestVersion.remove();
                        }
                    }
                }
            };
        }

    八、服务续约

    所谓的服务续约,其实就是一种心跳检查机制。客户端会定期发送心跳来续约。简单看一下代码的实现在DiscoveryClient类initScheduledTasks方法中客户端会在 initScheduledTasks 中,创建一个心跳检测的定时任务

        /**
         * Initializes all scheduled tasks.
         */
        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                cacheRefreshTask = new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                );
                scheduler.schedule(
                        cacheRefreshTask,
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                // Heartbeat timer
                heartbeatTask = new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                );
                scheduler.schedule(
                        heartbeatTask,
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }

    进入心跳的定时任务HeartbeatThread,然后这个定时任务中,会执行一个 HearbeatThread 的线程,这个线程会定时调用renew()来做续约。

    private class HeartbeatThread implements Runnable {
      public void run() {
        if (renew()) {
          lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
       }
     }
    }
       boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }

    服务端收到心跳请求的处理,在ApplicationResource.getInstanceInfo这个接口中,会返回一个InstanceResource的实例,在该实例下,定义了一个statusUpdate的接口来更新状态

    @Path("{id}")
    public InstanceResource getInstanceInfo(@PathParam("id") String id) {
      return new InstanceResource(this, id, serverConfig, registry);
    }

    InstanceResource类statusUpdate()方法,在该方法中,我们重点关注 registry.statusUpdate 这个方法,它会调用AbstractInstanceRegistry.statusUpdate来更新指定服务提供者在服务端存储的信息中的变化。

    @PUT
    @Path("status")
    public Response statusUpdate(
      @QueryParam("value") String newStatus,
      @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
      @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
      try {
        if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
          logger.warn("Instance not found: {}/{}", app.getName(), id);
          return Response.status(Status.NOT_FOUND).build();
       }
        boolean isSuccess = registry.statusUpdate(app.getName(), id,
                           
     InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
                             "true".equals(isReplication));
        if (isSuccess) {
          logger.info("Status updated: {} - {} - {}", app.getName(), id,
    newStatus);
          return Response.ok().build();
       } else {
          logger.warn("Unable to update status: {} - {} - {}", app.getName(),
    id, newStatus);
          return Response.serverError().build();
       }
     } catch (Throwable e) {
        logger.error("Error updating instance {} for status {}", id,
              newStatus);
        return Response.serverError().build();
     }
    }

    AbstractInstanceRegistry类的statusUpdate这个方法中,会拿到应用对应的实例列表,然后调用Lease.renew()去进行心跳续约。

    public boolean statusUpdate(String appName, String id,
                    InstanceStatus newStatus, String
    lastDirtyTimestamp,
                    boolean isReplication) {
      try {
        read.lock();
        // 更新状态的次数 状态统计
        STATUS_UPDATE.increment(isReplication);
        // 从本地数据里面获取实例信息,
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> lease = null;
        if (gMap != null) {
          lease = gMap.get(id);
       }
        // 实例不存在,则直接返回,表示失败
        if (lease == null) {
          return false;
    } else {
          // 执行一下lease的renew方法,里面主要是更新了这个instance的最后更新时间。
          lease.renew();
          // 获取instance实例信息
          InstanceInfo info = lease.getHolder();
          // Lease is always created with its instance info object.
          // This log statement is provided as a safeguard, in case this
    invariant is violated.
          if (info == null) {
            logger.error("Found Lease without a holder for instance id {}",
    id);
         }
          // 当instance信息不为空时,并且实例状态发生了变化
          if ((info != null) && !(info.getStatus().equals(newStatus))) {
           // 如果新状态是UP的状态,那么启动一下serviceUp() , 主要是更新服务的注册时
    间
            if (InstanceStatus.UP.equals(newStatus)) {
              lease.serviceUp();
           }
            // 将instance Id 和这个状态的映射信息放入覆盖缓存MAP里面去
            overriddenInstanceStatusMap.put(id, newStatus);
            // Set it for transfer of overridden status to replica on
            // 设置覆盖状态到实例信息里面去
            info.setOverriddenStatus(newStatus);
            long replicaDirtyTimestamp = 0;
            info.setStatusWithoutDirty(newStatus);
            if (lastDirtyTimestamp != null) {
              replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
           }
            // If the replication's dirty timestamp is more than the
    existing one, just update
            // it to the replica's.
            // 如果replicaDirtyTimestamp 的时间大于instance的
    getLastDirtyTimestamp() ,则更新
           
            if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
              info.setLastDirtyTimestamp(replicaDirtyTimestamp);
           }
            info.setActionType(ActionType.MODIFIED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            info.setLastUpdatedTimestamp();
            //更新写缓存
            invalidateCache(appName, info.getVIPAddress(),
    info.getSecureVipAddress());
         }
          return true;
       }
     } finally {
        read.unlock();
     }
    }





    九、服务发现

    继续来跟进服务的发现过程,就是客户端需要能够满足两个功能

    • 在启动的时候获取指定服务提供者的地址列表
    • Eureka server端地址发生变化时,需要动态感知

    他在两种情况下会发生更新,第一种是在DiscoveryClient构造时进行查询,另一种是定时任务每隔30s更新一次本地地址列表

    1.在DiscoveryClient构造时进行查询,构造方法中,如果当前的客户端默认开启了fetchRegistry,则会从eureka-server中拉取数据。

    DiscoveryClient(ApplicationInfoManager applicationInfoManager,
    EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
              Provider<BackupRegistry> backupRegistryProvider,
    EndpointRandomizer endpointRandomizer) {
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
          fetchRegistryFromBackup();
       }
    }

    进入DiscoveryClient类的fetchRegistry方法中,查询我们的服务的列表去更新

       private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    getAndStoreFullRegistry();
                } else {
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // Notify about cache refresh before updating the instance remote status
            onCacheRefreshed();
    
            // Update remote status based on refreshed data held in the cache
            updateInstanceRemoteStatus();
    
            // registry was fetched successfully, so return true
            return true;
        }

    2.定时任务每隔30s更新一次本地地址列表在DiscoveryClient构造的时候,会初始化一些任务,这个在前面分析过了。其中有一个任务动态更新本地服务地址列表,叫 cacheRefreshTask 。这个任务最终执行的是CacheRefreshThread这个线程。它是一个周期性执行的任务,具体来看一下。

      private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    //任务 cacheRefreshTask
    = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() );
    //延时30秒执行 scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); }
    if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound,
    //线程任务
    new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }

    TimedSupervisorTask

    从整体上看,TimedSupervisorTask是固定间隔的周期性任务,一旦遇到超时就会将下一个周期的间隔时间调大,如果连续超时,那么每次间隔时间都会增大一倍,一直到达外部参数设定的上限为止,一旦新任务不再超时,间隔时间又会自动恢复为初始值。 一句话牛批思想


      @Override
        public void run() {
    Future
    <?> future = null; try {
    //使用Future,可以设定子线程的超时时间,这样当前线程就不用无限等待了 future
    = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount());
    //指定等待子线程的最长时间 future.
    get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
    //delay是个很有用的变量,后面会用到,这里记得每次执行任务成功都会将delay重置 delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get();
    //任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间
    long newDelay = Math.min(maxDelay, currentDelay * 2);
    //设置为最新的值,考虑到多线程,所以用CAS delay.compareAndSet(currentDelay, newDelay); }
    catch (RejectedExecutionException e) {
    //一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略,就会将调度器停掉
    if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) {
    //一旦出现未知的异常,就会停掉调度器
    if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally {
    //这里任务要么执行完成,要么发生异常,都用cancel方法来清理任务
    if (future != null) { future.cancel(true); } //只要调度器没有停止,就再指定等待时间之后在执行一次同样的任务 if (!scheduler.isShutdown()) {
    //这里就是周期性任务的原因:只要没有停止调度器,就再创建一次性任务,执行时间是dealy值
    //假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为50秒(构造方法的入参expBackOffBound)
    //如果最近一次任务没有超时,那么就在30秒后开始新任务
    //如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个*2的操作,*2后的60秒超过了最大间隔50秒) scheduler.schedule(
    this, delay.get(), TimeUnit.MILLISECONDS); } } }

    回退一步进入点击CacheRefreshThread然后看他的refreshRegistry方法,这段代码主要两个逻辑

    • 判断remoteRegions是否发生了变化
    • 调用fetchRegistry获取本地服务地址缓存
    @VisibleForTesting
      void refreshRegistry() {
        try {
          boolean isFetchingRemoteRegionRegistries =
    isFetchingRemoteRegionRegistries();
          boolean remoteRegionsModified = false;
          //如果部署在aws环境上,会判断最后一次远程区域更新的信息和当前远程区域信息进行比
    较,如果不想等,则更新
          String latestRemoteRegions =
    clientConfig.fetchRegistryForRemoteRegions();
          if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
             //判断最后一次
         }
    //从缓存中获取地址       boolean success
    = fetchRegistry(remoteRegionsModified);       if (success) {         registrySize = localRegionApps.get().size();         lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();      }      // 省略    } catch (Throwable e) {       logger.error("Cannot fetch registry from server", e);    }  }

    点击上图中的fetchRegistry看做了啥

    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
      Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
      try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        // 取出本地缓存的服务列表信息
        Applications applications = getApplications();
    //判断多个条件,确定是否触发全量更新,如下任一个满足都会全量更新:
          //1. 是否禁用增量更新;
          //2. 是否对某个region特别关注;
          //3. 外部调用时是否通过入参指定全量更新;
          //4. 本地还未缓存有效的服务列表信息;
        if (clientConfig.shouldDisableDelta()
          ||
    (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
    || forceFullRegistryFetch
          || (applications == null)
          || (applications.getRegisteredApplications().size() == 0)
          || (applications.getVersion() == -1)) //Client application does not
    have latest library supporting delta
       {
          //调用全量更新
          getAndStoreFullRegistry();
       } else {
          //调用增量更新
          getAndUpdateDelta(applications);
       }
        //重新计算和设置一致性hash码
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances(); //日志打印所有应用的所有实例数之和
     } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status =
    {}", appPathIdentifier, e.getMessage(), e);
        return false;
     } finally {
        if (tracer != null) {
          tracer.stop();
       }
     }
      //将本地缓存更新的事件广播给所有已注册的监听器,注意该方法已被CloudEurekaClient类重写
      onCacheRefreshed();
      // Update remote status based on refreshed data held in the cache
      //检查刚刚更新的缓存中,有来自Eureka server的服务列表,其中包含了当前应用的状态,
      //当前实例的成员变量lastRemoteInstanceStatus,记录的是最后一次更新的当前应用状态,
      //上述两种状态在updateInstanceRemoteStatus方法中作比较 ,如果不一致,就更新
    lastRemoteInstanceStatus,并且广播对应的事件
      updateInstanceRemoteStatus();
      // registry was fetched successfully, so return true
      return true;
    }

    DiscoveryClient.getAndStoreFullRegistry

    从eureka server端获取服务注册中心的地址信息,然后更新并设置到本地缓存 localRegionApps 。

    private void getAndStoreFullRegistry() throws Throwable {
      long currentUpdateGeneration = fetchRegistryGeneration.get();
      logger.info("Getting all instance registry info from the eureka server");
      Applications apps = null;
      EurekaHttpResponse<Applications> httpResponse =
    clientConfig.getRegistryRefreshSingleVipAddress() == null
        ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
       :
    eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddre
    ss(), remoteRegionsRef.get());
      if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
     }
      logger.info("The response status is {}", httpResponse.getStatusCode());
    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this
    information");
     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration,
    currentUpdateGeneration + 1)) {
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}",
    apps.getAppsHashCode());
     } else {
        logger.warn("Not updating applications as another thread is updating it
    already");
     }
    }

    十、服务端查询服务地址流程

    客户端发起服务地址的查询有两种,一种是全量、另一种是增量。对于全量查询请求,会调用Eureka-server的ApplicationsResource的getContainers方法。而对于增量请求,会调用ApplicationsResource.getContainerDifferential。
    ApplicationsResource.getContainers接收客户端发送的获取全量注册信息请求。

    @GET
    public Response getContainers(@PathParam("version") String version,
                   @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                   @HeaderParam(HEADER_ACCEPT_ENCODING) String
    acceptEncoding,
                   @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT)
    String eurekaAccept,
                   @Context UriInfo uriInfo,
                   @Nullable @QueryParam("regions") String
    regionsStr) {
      boolean isRemoteRegionRequested = null != regionsStr &&
    !regionsStr.isEmpty();
      String[] regions = null;
      if (!isRemoteRegionRequested) {
        EurekaMonitors.GET_ALL.increment();
     } else {
        regions = regionsStr.toLowerCase().split(",");
        Arrays.sort(regions); // So we don't have different caches for same
    regions queried in different order.
        EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
     }
      // EurekaServer无法提供服务,返回403
      if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
        return Response.status(Status.FORBIDDEN).build();
     }
      CurrentRequestVersion.set(Version.toEnum(version));
      KeyType keyType = Key.KeyType.JSON;// 设置返回数据格式,默认JSON
      String returnMediaType = MediaType.APPLICATION_JSON;
      if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        // 如果接收到的请求头部没有具体格式信息,则返回格式为XML
    keyType = Key.KeyType.XML;
        returnMediaType = MediaType.APPLICATION_XML;
     }
    // 构建缓存键
      Key cacheKey = new Key(Key.EntityType.Application,
                 ResponseCacheImpl.ALL_APPS,
                 keyType, CurrentRequestVersion.get(),
    EurekaAccept.fromString(eurekaAccept), regions
                );
      // 返回不同的编码类型的数据,去缓存中取数据的方法基本一致
      Response response;
      if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        response = Response.ok(responseCache.getGZIP(cacheKey))
         .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
         .header(HEADER_CONTENT_TYPE, returnMediaType)
         .build();
     } else {
        response = Response.ok(responseCache.get(cacheKey))
         .build();
     }
      CurrentRequestVersion.remove();
      return response;
    }

    responseCache.getGZIP从缓存中读取数据。

    public byte[] getGZIP(Key key) {
      Value payload = getValue(key, shouldUseReadOnlyResponseCache);
      if (payload == null) {
        return null;
     }
      return payload.getGzipped();
    }
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
          if (useReadOnlyCache) {
            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
              payload = currentPayload;
           } else {
              payload = readWriteCacheMap.get(key);
              readOnlyCacheMap.put(key, payload);
           }
         } else {
            payload = readWriteCacheMap.get(key);
         }
       } catch (Throwable t) {
          logger.error("Cannot get value for key : {}", key, t);
       }
        return payload;
     }




  • 相关阅读:
    MongoDB 删除文档
    MongoDB 删除文档
    C#标记 [已弃用] 的方法
    C#标记 [已弃用] 的方法
    MySQL 正则表达式
    MySQL 正则表达式
    SQLcase when then用法
    SQLcase when then用法
    衣服尺码自定义排序sql
    衣服尺码自定义排序sql
  • 原文地址:https://www.cnblogs.com/xing1/p/14204442.html
Copyright © 2011-2022 走看看