一、Eureka的一些概念
在Eureka的服务治理中,会涉及到下面一些概念:
服务注册:Eureka Client会通过发送REST请求的方式向Eureka Server注册自己的服务,提供自身的元数据,比如ip地址、端口、运行状况指标的url、主页地址等信息。Eureka Server接收到注册请求后,就会把这些元数据信息存储在一个双层的Map中。
服务续约:在服务注册后,Eureka Client会维护一个心跳来持续通知Eureka Server,说明服务一直处于可用状态,防止被剔除。Eureka Client在默认的情况下会每隔30秒发送一次心跳来进行服务续约。
服务同步:Eureka Server之间会互相进行注册,构建Eureka Server集群,不同Eureka Server之间会进行服务同步,用来保证服务信息的一致性。
获取服务:服务消费者(Eureka Client)在启动的时候,会发送一个REST请求给Eureka Server,获取上面注册的服务清单,并且缓存在Eureka Client本地,默认缓存30秒。同时,为了性能考虑,Eureka Server也会维护一份只读的服务清单缓存,该缓存每隔30秒更新一次。
服务调用:服务消费者在获取到服务清单后,就可以根据清单中的服务列表信息,查找到其他服务的地址,从而进行远程调用。Eureka有Region和Zone的概念,一个Region可以包含多个Zone,在进行服务调用时,优先访问处于同一个Zone中的服务提供者。
服务下线:当Eureka Client需要关闭或重启时,就不希望在这个时间段内再有请求进来,所以,就需要提前先发送REST请求给Eureka Server,告诉Eureka Server自己要下线了,Eureka Server在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去。
服务剔除:有时候,服务实例可能会因为网络故障等原因导致不能提供服务,而此时该实例也没有发送请求给Eureka Server来进行服务下线,所以,还需要有服务剔除的机制。Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。
自我保护:既然Eureka Server会定时剔除超时没有续约的服务,那就有可能出现一种场景,网络一段时间内发生了异常,所有的服务都没能够进行续约,Eureka Server就把所有的服务都剔除了,这样显然不太合理。所以,就有了自我保护机制,当短时间内,统计续约失败的比例,如果达到一定阈值,则会触发自我保护的机制,在该机制下,Eureka Server不会剔除任何的微服务,等到正常后,再退出自我保护机制。
从这些概念中,就可以知道大体的流程,Eureka Client向Eureka Server注册,并且维护心跳来进行续约,如果长时间不续约,就会被剔除。Eureka Server之间进行数据同步来形成集群,Eureka Client从Eureka Server获取服务列表,用来进行服务调用,Eureka Client服务重启前调用Eureka Server的接口进行下线操作。
二、服务注册的入口
在准备写这一篇时看了很多大佬写的文章,都有一个共同点,都是从启动类注解@EnableDiscoveryClient开始找入口的,但是有个问题就是,在我上篇的文章中我并没有用到这个注解也完成了服务的注册,这是怎么做到的呢,如果想了解原因就要从springboot说起了,服务注册是在spring boot应用启动的时候发起的。具体的执行路径暂且不看,spring cloud是一个生态,它提供了一套标准,这套标准可以通过不同的组件来实现,其中就包含服务注册/发现、熔断、负载均衡等,在spring-cloud-commons这个包中,org.springframework.cloud.client.serviceregistry 路径下,可以看到一个服务注册的接口定义 ServiceRegistry 。它就是定义了spring cloud中服务注册的一个接口。我们看一下它的类关系图,这个接口有一个唯一的实现 EurekaServiceRegistry 。表示采用的是Eureka Server作为服务注册中心。
三、服务注册的触发路径
有了上面的概念下面就来看下springboot是如何调用EurekaServiceRegistry 的,而EurekaServiceRegistry 在被调用时又做了啥,大家自要想想其实应该不难猜测到,服务的注册取决于服务是否已经启动好了。而在spring boot中,会等到spring 容器启动并且所有的配置都完成之后来进行注册。而这个动作在spring boot的启动方法中的refreshContext中完成。
点击run
进入SpringApplication的ConfigurableApplicationContext方法
点击run进入下图
再点击run进入SpringApplication的run方法
public ConfigurableApplicationContext run(String... args) { StopWatch stopWatch = new StopWatch(); stopWatch.start(); ConfigurableApplicationContext context = null; Collection<SpringBootExceptionReporter> exceptionReporters = new ArrayList<>(); configureHeadlessProperty(); SpringApplicationRunListeners listeners = getRunListeners(args); listeners.starting(); try { ApplicationArguments applicationArguments = new DefaultApplicationArguments(args); ConfigurableEnvironment environment = prepareEnvironment(listeners, applicationArguments); configureIgnoreBeanInfo(environment); Banner printedBanner = printBanner(environment); context = createApplicationContext(); exceptionReporters = getSpringFactoriesInstances(SpringBootExceptionReporter.class, new Class[] { ConfigurableApplicationContext.class }, context); prepareContext(context, environment, listeners, applicationArguments, printedBanner);
//刷新上下文的方法 refreshContext(context); afterRefresh(context, applicationArguments); stopWatch.stop(); if (this.logStartupInfo) { new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), stopWatch); } listeners.started(context); callRunners(context, applicationArguments); } catch (Throwable ex) { handleRunFailure(context, ex, exceptionReporters, listeners); throw new IllegalStateException(ex); } try { listeners.running(context); } catch (Throwable ex) { handleRunFailure(context, ex, exceptionReporters, null); throw new IllegalStateException(ex); } return context; }
点击refreshContext(context);看下刷新做了啥子事
private void refreshContext(ConfigurableApplicationContext context) { refresh((ApplicationContext) context); if (this.registerShutdownHook) { try { context.registerShutdownHook(); } catch (AccessControlException ex) { // Not allowed in some environments. } } }
点击refresh()一直往里面走,至到下图位置,选择第一个
进入AbstractApplicationContext的refresh()方法try里面有很多调用的前置方法,里面是spring的东西,在本篇中讲了没什么意义,下面就看跟本篇有关的一个方法finishRefresh();
@Override public void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { // Prepare this context for refreshing. prepareRefresh(); // Tell the subclass to refresh the internal bean factory. ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory(); // Prepare the bean factory for use in this context. prepareBeanFactory(beanFactory); try { // Allows post-processing of the bean factory in context subclasses. postProcessBeanFactory(beanFactory); // Invoke factory processors registered as beans in the context. invokeBeanFactoryPostProcessors(beanFactory); // Register bean processors that intercept bean creation. registerBeanPostProcessors(beanFactory); // Initialize message source for this context. initMessageSource(); // Initialize event multicaster for this context. initApplicationEventMulticaster(); // Initialize other special beans in specific context subclasses. onRefresh(); // Check for listener beans and register them. registerListeners(); // Instantiate all remaining (non-lazy-init) singletons. finishBeanFactoryInitialization(beanFactory); // Last step: publish corresponding event. finishRefresh(); } catch (BeansException ex) { if (logger.isWarnEnabled()) { logger.warn("Exception encountered during context initialization - " + "cancelling refresh attempt: " + ex); } // Destroy already created singletons to avoid dangling resources. destroyBeans(); // Reset 'active' flag. cancelRefresh(ex); // Propagate exception to caller. throw ex; } finally { // Reset common introspection caches in Spring's core, since we // might not ever need metadata for singleton beans anymore... resetCommonCaches(); } } }
在finishRefresh();从名字上可以看到它是用来体现完成刷新的操作,也就是刷新完成之后要做的后置的操作。它主要做几个事情
- 清空缓存
- 初始化一个LifecycleProcessor,在Spring启动的时候启动bean,在spring结束的时候销毁bean
- 调用LifecycleProcessor的onRefresh方法,启动实现了Lifecycle接口的bean
- 发布ContextRefreshedEvent
- 注册MBean,通过JMX进行监控和管理
在这个方法中有两个核心东西,一个是 initLifecycleProcessor();另一个是getLifecycleProcessor().onRefresh();其中getLifecycleProcessor().onRefresh()它是调用生命周期处理器的onrefresh方法,找到SmartLifecycle接口的所有实现类并调用start方法。
protected void finishRefresh() { // Clear context-level resource caches (such as ASM metadata from scanning). clearResourceCaches(); //初始化生命周期的处理器 // Initialize lifecycle processor for this context. initLifecycleProcessor(); // Propagate refresh to lifecycle processor first. getLifecycleProcessor().onRefresh(); // Publish the final event. publishEvent(new ContextRefreshedEvent(this)); // Participate in LiveBeansView MBean, if active. LiveBeansView.registerApplicationContext(this); }
点击上图中的onRefresh()进入DefaultLifecycleProcessor类的onRefresh()方法
@Override public void onRefresh() { startBeans(true); this.running = true; }
点击startBeans看下这个启动Bean的方法做了啥,如果在下面Bebugger的话会发现lifecycleBeans 的map集合中有我们比较关心的eurekaAutoServiceRegistration,这个方法其实很简单,就是拿到beans的集合然后循环去调用start方法
private void startBeans(boolean autoStartupOnly) { Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans(); Map<Integer, LifecycleGroup> phases = new HashMap<>(); lifecycleBeans.forEach((beanName, bean) -> { if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) { int phase = getPhase(bean); LifecycleGroup group = phases.get(phase); if (group == null) { group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly); phases.put(phase, group); } group.add(beanName, bean); } }); if (!phases.isEmpty()) { List<Integer> keys = new ArrayList<>(phases.keySet()); Collections.sort(keys); for (Integer key : keys) {
// phases.get(key).start(); } } }
点击上面的start()方法
public void start() { if (this.members.isEmpty()) { return; } if (logger.isDebugEnabled()) { logger.debug("Starting beans in phase " + this.phase); } Collections.sort(this.members); for (LifecycleGroupMember member : this.members) { doStart(this.lifecycleBeans, member.name, this.autoStartupOnly); } }
点击上面的doStart()方法,最后调用bean.start();此时bean.start();调用的Bean的实例应该是EurekaAutoServiceRegistration
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) { Lifecycle bean = lifecycleBeans.remove(beanName); if (bean != null && bean != this) { String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName); for (String dependency : dependenciesForBean) { doStart(lifecycleBeans, dependency, autoStartupOnly); } if (!bean.isRunning() && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) { if (logger.isTraceEnabled()) { logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]"); } try {
//最后的调用 bean.start(); } catch (Throwable ex) { throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex); } if (logger.isDebugEnabled()) { logger.debug("Successfully started bean '" + beanName + "'"); } } } }
然后debugger一下发现调用的也和我们猜想的一样是EurekaAutoServiceRegistration
四、SmartLifeCycle的演示
在接着向下说之前,要先弄懂SmartLifeCycle这块的知识, SmartLifeCycle是一个接口,当Spring容器加载完所有的Bean并且初始化之后,会继续回调实现了SmartLifeCycle接口的类中对应的方法,比如(start)。实际上我们自己也可以拓展,比如在springboot工程的main方法同级目录下,写一个测试类,实现SmartLifeCycle接口,并且通过 @Service 声明为一个bean,因为要被spring去加载,首先得是bean。
@Service public class SmartLifecycleDemo implements SmartLifecycle { public void start() { System.out.println("start"); } public void stop() { System.out.println("stop"); } public boolean isRunning() { System.out.println("isRunning"); return false; } }
接着,我们启动spring boot应用后,可以看到控制台输出了 start 字符串。有了上面的概念后接着向下走,进入EurekaAutoServiceRegistration类中,会发现这个类实现了SmartLifecycle,这个好办了,二话不说直接看他的start()方法;
@Override public void start() { // only set the port if the nonSecurePort or securePort is 0 and this.port != 0 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get() && this.registration.getNonSecurePort() > 0) { this.serviceRegistry.register(this.registration); this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig())); this.running.set(true); } }
在start方法中,我们可以看到 this.serviceRegistry.register 这个方法,它实际上就是发起服务注册的机制。此时this.serviceRegistry的实例,应该是 EurekaServiceRegistry ,这就回到了开篇说的EurekaServiceRegistry 是怎么触发调用的了, 原因是EurekaAutoServiceRegistration的构造方法中,会有一个赋值操作,而这个构造方法是在EurekaClientAutoConfiguration 这个自动装配类中被装配和初始化的,代码如下。
五、服务的注册
接下来跟着上面的代码分析服务注册的流程,EurekaAutoServiceRegistration.start方法;this.serviceRegistry.register(this.registration); 方法最终会调用EurekaServiceRegistry 类中的 register 方法来实现服务注册。
@Override public void start() { // only set the port if the nonSecurePort or securePort is 0 and this.port != 0 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get() && this.registration.getNonSecurePort() > 0) { this.serviceRegistry.register(this.registration); this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig())); this.running.set(true); } }
进入EurekaServiceRegistry 类中的 register 方法
@Override public void register(EurekaRegistration reg) { maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } //设置当前实例的状态,一旦这个实例的状态发生变化,只要状态不是DOWN,那么就会被监听器监听并且执行服务注册 reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); //设置健康检查的处理 reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg .getEurekaClient().registerHealthCheck(healthCheckHandler)); }
从上述代码来看,注册方法中并没有真正调用Eureka的方法去执行注册,而是仅仅设置了一个状态以及设置健康检查处理器。继续看一下reg.getApplicationInfoManager().setInstanceStatus方法。
public synchronized void setInstanceStatus(InstanceStatus status) {
//缓存服务的状态,应该是存在内存中的东西,有兴趣的可以点击instanceStarusMapper进去看下 InstanceStatus next = instanceStatusMapper.map(status); if (next == null) { return; } InstanceStatus prev = instanceInfo.setStatus(next); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, next)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } }
在这个方法中,它会通过监听器来发布一个状态变更事件。此时listener的实例是StatusChangeListener ,也就是调用 StatusChangeListener 的notify方法。这个事件是触发一个服务状态变更,应该是有地方会监听这个事件,然后基于这个事件。这个时候我们以为找到了方向,然后点击进去一看,卞击,发现它是一个接口。而且我们发现它是静态的内部接口,还无法直接看到它的实现类。于是又往回找,因为不用想它一定是在某个地方做了初始化的工作,于是,我想找到EurekaServiceRegistry.register方法中的是什么,而且我们发现ApplicationInfoManager是来自于EurekaRegistration这个类中的属性。而reg.getApplicationInfoManager 这个实例EurekaRegistration又是在EurekaAutoServiceRegistration这个类中实例化的。那么猜想,是不是在自动装配中做了什么东西。于是找到EurekaClientAutoConfiguration这个类,果然看到了Bean的一些自动装配,其中包含 EurekaClient 、 ApplicationInfoMangager 、 EurekaRegistration 等。其中还有一个重要的类是EurekaClientConfiguration,通过名字就知道这个跟EurekaClient一定是有关系的,前面说的是跟Eureka的注册有关系,这个Client应该是跟通信有关系的,在上一篇中也说过通讯是跟client有关系的
@Configuration(proxyBeanMethods = false) @ConditionalOnMissingRefreshScope protected static class EurekaClientConfiguration { @Autowired private ApplicationContext context; @Autowired private AbstractDiscoveryClientOptionalArgs<?> optionalArgs; @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } @Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) public ApplicationInfoManager eurekaApplicationInfoManager( EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty( value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, @Autowired( required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) { return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager) .with(eurekaClient).with(healthCheckHandler).build(); } }
通过上面代码看到了一个很重要的Bean在启动的时候做了自动装配,也就是CloudEurekaClient 。从名字上来看,很容易的识别并猜测出它是Eureka客户端的一个工具类,用来实现和服务端的通信以及处理。这个很多源码一贯的套路,要么在构造方法里面去做很多的初始化和一些后台执行的程序操作,要么就是通过异步事件的方式来处理。接着,看一下CloudEurekaClient的初始化过程,它的构造方法中会通过 super 调用父类的构造方法。也就是DiscoveryClient的构造。(super(applicationInfoManager, config, args);调用父类的构造方法,而CloudEurekaClient的父类是DiscoveryClient.),这转了一圈就转到了网上很多大佬写的注解进入DiscoveryClient类中来了
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) { super(applicationInfoManager, config, args); this.applicationInfoManager = applicationInfoManager; this.publisher = publisher; this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport"); ReflectionUtils.makeAccessible(this.eurekaTransportField); }
进入DiscoveryClient类,可以看到在最终的DiscoveryClient改造方法中,有非常长的代码。其实很多代码可以不需要关心,大部分都是一些初始化工作,比如初始化了几个定时任务
- scheduler
- heartbeatExecutor 心跳定时任务
- cacheRefreshExecutor 定时去同步服务端的实例列表
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.endpointRandomizer = endpointRandomizer; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); //是否要从eureka server上获取服务地址信息 if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } //是否要注册到eureka server上 if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); //如果不需要注册并且不需要更新服务地址 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh
//定时任务,用线程池解决 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); //心跳 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //缓存刷新,前面说过服务提供者在注册时拉取列表时要将服务缓存到本地,是用来刷新本地缓存 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } //如果需要注册到Eureka server并且是开启了初始化的时候强制注册,则调用register()发起服务注册 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
//初始化定时任务 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
进入DiscoveryClient.initScheduledTasks;initScheduledTasks 去启动一个定时任务。
- 如果配置了开启从注册中心刷新服务列表,则会开启cacheRefreshExecutor这个定时任务
- 如果开启了服务注册到Eureka,则通过需要做几个事情.
- 建立心跳检测机制
- 通过内部类来实例化StatusChangeListener 实例状态监控接口,这个就是前面 在分析启动过程中所看到的,调用notify的方法,实际上会在这里体现。
private void initScheduledTasks() {
//如果配置了开启从注册中心刷新服务列表,则会开启cacheRefreshExecutor这个定时任务 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } //如果开启了服务注册到Eureka,则通过需要做几个事情 if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator
//初始化一个:instanceInfoReplicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; //注册实例状态变化的监听 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //启动一个实例信息复制器,主要就是为了开启一个定时线程,每40秒判断实例信息是否变更,如果变更了则重新注册 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
进入instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());方法看start做了什么事,可以发现用了cas保证了线程的安全性,并在里面启动了一个任务,
public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) {
//设置实例信息 instanceInfo.setIsDirty(); // for initial register Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
上图中有一段代码是Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);他这是将当前对象实例传递过去,我们看scheduler是一个线程池,那么他必然用到了一个线程
所以在当前类中直接找run()方法,run方法实际上和前面自动装配所执行的服务注册方法是一样的,也就是调用 register 方法进行服务注册,并且在finally中,每30s会定时执行一下当前的run 方法进行检查。
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) {
//这个是最终发起注册的方法,跟通信有关 discoveryClient.register();
//释放 instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
DiscoveryClient中的register方法进去看下;最终,我们终于找到服务注册的入口了,也就是网上大多数文章开篇就说的方法; eurekaTransport.registrationClient.register 最终调用的是 AbstractJerseyEurekaHttpClient#register(...)`, 如果有兴趣自己去看代码,就会发现去调用之前有很多绕来绕去的代码,比如工厂模式、装饰器模式等。
boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
进入他的像类AbstractJerseyEurekaHttpClient
在AbstractJerseyEurekaHttpClient的register方法中很显然的发现这里是发起了一次http请求,访问Eureka-Server的apps/${APP_NAME}接口,将当前服务实例的信息发送到Eureka Server进行保存。至此,基本上已经知道Spring Cloud Eureka 是如何在启动的时候把服务信息注册到Eureka Server上的了
@Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
至此,已经知道Eureka Client发起服务注册时,有两个地方会执行服务注册的任务
- 在Spring Boot启动时,由于自动装配机制将CloudEurekaClient注入到了容器,并且执行了构造方法,而在构造方法中有一个定时任务每40s会执行一次判断,判断实例信息是否发生了变化,如果是则会发起服务注册的流程
- 在Spring Boot启动时,通过refresh方法,最终调用StatusChangeListener.notify进行服务状态变更的监听,而这个监听的方法受到事件之后会去执行服务注册。
六、Eureka Server收到请求之后的处理
前面说到了服务将当前服务实例的信息发送到Eureka Server进行保存那么接下来就看Server是怎么接收请求并且存储的,请求入口在: com.netflix.eureka.resources.ApplicationResource.addInstance() 。这里所提供的REST服务,采用的是jersey来实现的。Jersey是基于JAX-RS标准,提供REST的实现的支持。
进入ApplicationResource.addInstance()方法,当EurekaClient调用register方法发起注册时,会调用ApplicationResource.addInstance方法。服务注册就是发送一个 POST 请求带上当前实例信息到类 ApplicationResource 的 addInstance方法进行服务注册
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
本来想debugger看看上图中if的判断,结果发现所有if判断都没进去直接进了registry.register(info, "true".equals(isReplication));方法,那进入PeerAwareInstanceRegistryImpl.register方法中
先来看PeerAwareInstanceRegistryImpl的类关系图,从类关系图可以看出,PeerAwareInstanceRegistry的最顶层接口为LeaseManager与LookupService
- 其中LookupService定义了最基本的发现示例的行为
- LeaseManager定义了处理客户端注册,续约,注销等操作
在 addInstance 方法中,最终调用的是 PeerAwareInstanceRegistryImpl.register 方法。
- leaseDuration 表示租约过期时间,默认是90s,也就是当服务端超过90s没有收到客户端的心跳,则主动剔除该节点
- 调用super.register发起节点注册
-
replicateToPeers将信息复制到Eureka Server集群中的其他机器上,同步的实现也很简单,就是获得集群中的所有节点,然后逐个发起注册
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
//如果客户端有自己定义心跳超时时间,则采用客户端的时间
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
//复制到Eureka Server集群中的其他节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
点击super.register看他是怎么发起节点注册的,进入AbstractInstanceRegistry类中register方法;可以发现Eureka-Server的服务注册,实际上是将客户端传递过来的实例数据保存到Eureka-Server中的ConcurrentHashMap中。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock();
//从registry中获得当前实例信息,根据appName Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
//增加注册次数到监控中 REGISTER.increment(isReplication);
//如果当前appName是第一次注册,则初始化一个concurrentHashMap if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } }
//从gMap中查询已经存在的Lease信息,Lease是租约,实际上它把服务提供者的实例信息包装成一个lease。里面提供了对于改服务实例的租约管理 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease
//当instance已经存在时,和客户端的instance的信息做比较,时间最新的那个,为有效instance信息 if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration
//当lease不存在时,进入到这段代码 synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); }
//构建一个lease Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) {
//当原来存在lease的信息时,设置serviceUpTimestamp,保证服务启动的时间一直是第一次注册的那个 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease);
//添加到最新的注册队列中 recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); // This is where the initial state transfer of overridden status happens
//检查实例状态是否发生变化,如果是并且存在,则覆盖原来的状态 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp
//得到instanceStatus,判断是否是UP状态 if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); }
//设置注册类型为添加 registrant.setActionType(ActionType.ADDED);
//租约变更记录队列,记录了实例的每次变化,用于注册信息的增量获取 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp();
//让缓存失效 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
至此,服务注册在客户端和服务端的处理过程做了一个详细的分析,实际上在Eureka Server端,会把客户端的地址信息保存到ConcurrentHashMap中存储。并且服务提供者和注册中心之间,会建立一个心跳检测机制。用于监控服务提供者的健康状态。
七、Eureka 的三级缓存设计
Eureka Server存在三个变量:(registry、readWriteCacheMap、readOnlyCacheMap)保存服务注册信息,默认情况下定时任务每30s将readWriteCacheMap同步至readOnlyCacheMap,每60s清理超过90s未续约的节点,Eureka Client每30s从readOnlyCacheMap更新服务注册信息,而客户端服务的注册则从registry更新服务注册信息。
7.1、多级缓存的意义
这里为什么要设计多级缓存呢?原因很简单,就是当存在大规模的服务注册和更新时,如果只是修改一个ConcurrentHashMap数据,那么势必因为锁的存在导致竞争,影响性能。而Eureka又是AP模型,只需要满足最终可用就行。所以它在这里用到多级缓存来实现读写分离。注册方法写的时候直接写内存注册表,写完表之后主动失效读写缓存。获取注册信息接口先从只读缓存取,只读缓存没有再去读写缓存取,读写缓存没有再去内存注册表里取(不只是取,此处较复杂)。并且,读写缓存会更新回写只读缓存
- responseCacheUpdateIntervalMs : readOnlyCacheMap 缓存更新的定时器时间间隔,默认为30秒
- responseCacheAutoExpirationInSeconds : readWriteCacheMap 缓存过期时间,默认为 180 秒
7.2、服务注册的缓存失效
在AbstractInstanceRegistry.register方法的最后,会调用invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),registrant.getSecureVipAddress()); 方法,使得读写缓存失效。
点击invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());进入以下方法
private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { // invalidate cache responseCache.invalidate(appName, vipAddress, secureVipAddress); }
点击invalidate进入ResponseCacheImpl类中的invalidate方法
@Override public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { for (Key.KeyType type : Key.KeyType.values()) { for (Version v : Version.values()) { invalidate( new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact) ); if (null != vipAddress) { invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full)); } if (null != secureVipAddress) { invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full)); } } } }
点击invalidate进入下面
public void invalidate(Key... keys) { for (Key key : keys) { logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(key); Collection<Key> keysWithRegions = regionSpecificKeys.get(key); if (null != keysWithRegions && !keysWithRegions.isEmpty()) { for (Key keysWithRegion : keysWithRegions) { logger.debug("Invalidating the response cache key : {} {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(keysWithRegion); } } } }
7.3、定时同步缓存
ResponseCacheImpl的构造方法中,会启动一个定时任务,这个任务会定时检查写缓存中的数据变化,进行更新和同步。
private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } finally { CurrentRequestVersion.remove(); } } } }; }
八、服务续约
所谓的服务续约,其实就是一种心跳检查机制。客户端会定期发送心跳来续约。简单看一下代码的实现在DiscoveryClient类initScheduledTasks方法中客户端会在 initScheduledTasks 中,创建一个心跳检测的定时任务
/** * Initializes all scheduled tasks. */ private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
进入心跳的定时任务HeartbeatThread,然后这个定时任务中,会执行一个 HearbeatThread 的线程,这个线程会定时调用renew()来做续约。
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
服务端收到心跳请求的处理,在ApplicationResource.getInstanceInfo这个接口中,会返回一个InstanceResource的实例,在该实例下,定义了一个statusUpdate的接口来更新状态
@Path("{id}") public InstanceResource getInstanceInfo(@PathParam("id") String id) { return new InstanceResource(this, id, serverConfig, registry); }
InstanceResource类statusUpdate()方法,在该方法中,我们重点关注 registry.statusUpdate 这个方法,它会调用AbstractInstanceRegistry.statusUpdate来更新指定服务提供者在服务端存储的信息中的变化。
@PUT @Path("status") public Response statusUpdate( @QueryParam("value") String newStatus, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { try { if (registry.getInstanceByAppAndId(app.getName(), id) == null) { logger.warn("Instance not found: {}/{}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } boolean isSuccess = registry.statusUpdate(app.getName(), id, InstanceStatus.valueOf(newStatus), lastDirtyTimestamp, "true".equals(isReplication)); if (isSuccess) { logger.info("Status updated: {} - {} - {}", app.getName(), id, newStatus); return Response.ok().build(); } else { logger.warn("Unable to update status: {} - {} - {}", app.getName(), id, newStatus); return Response.serverError().build(); } } catch (Throwable e) { logger.error("Error updating instance {} for status {}", id, newStatus); return Response.serverError().build(); } }
AbstractInstanceRegistry类的statusUpdate这个方法中,会拿到应用对应的实例列表,然后调用Lease.renew()去进行心跳续约。
public boolean statusUpdate(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) { try { read.lock(); // 更新状态的次数 状态统计 STATUS_UPDATE.increment(isReplication); // 从本地数据里面获取实例信息, Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> lease = null; if (gMap != null) { lease = gMap.get(id); } // 实例不存在,则直接返回,表示失败 if (lease == null) { return false; } else { // 执行一下lease的renew方法,里面主要是更新了这个instance的最后更新时间。 lease.renew(); // 获取instance实例信息 InstanceInfo info = lease.getHolder(); // Lease is always created with its instance info object. // This log statement is provided as a safeguard, in case this invariant is violated. if (info == null) { logger.error("Found Lease without a holder for instance id {}", id); } // 当instance信息不为空时,并且实例状态发生了变化 if ((info != null) && !(info.getStatus().equals(newStatus))) { // 如果新状态是UP的状态,那么启动一下serviceUp() , 主要是更新服务的注册时 间 if (InstanceStatus.UP.equals(newStatus)) { lease.serviceUp(); } // 将instance Id 和这个状态的映射信息放入覆盖缓存MAP里面去 overriddenInstanceStatusMap.put(id, newStatus); // Set it for transfer of overridden status to replica on // 设置覆盖状态到实例信息里面去 info.setOverriddenStatus(newStatus); long replicaDirtyTimestamp = 0; info.setStatusWithoutDirty(newStatus); if (lastDirtyTimestamp != null) { replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp); } // If the replication's dirty timestamp is more than the existing one, just update // it to the replica's. // 如果replicaDirtyTimestamp 的时间大于instance的 getLastDirtyTimestamp() ,则更新 if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) { info.setLastDirtyTimestamp(replicaDirtyTimestamp); } info.setActionType(ActionType.MODIFIED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); info.setLastUpdatedTimestamp(); //更新写缓存 invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress()); } return true; } } finally { read.unlock(); } }
九、服务发现
继续来跟进服务的发现过程,就是客户端需要能够满足两个功能
- 在启动的时候获取指定服务提供者的地址列表
- Eureka server端地址发生变化时,需要动态感知
他在两种情况下会发生更新,第一种是在DiscoveryClient构造时进行查询,另一种是定时任务每隔30s更新一次本地地址列表
1.在DiscoveryClient构造时进行查询,构造方法中,如果当前的客户端默认开启了fetchRegistry,则会从eureka-server中拉取数据。
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } }
进入DiscoveryClient类的fetchRegistry方法中,查询我们的服务的列表去更新
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status onCacheRefreshed(); // Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }
2.定时任务每隔30s更新一次本地地址列表在DiscoveryClient构造的时候,会初始化一些任务,这个在前面分析过了。其中有一个任务动态更新本地服务地址列表,叫 cacheRefreshTask 。这个任务最终执行的是CacheRefreshThread这个线程。它是一个周期性执行的任务,具体来看一下。
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//任务 cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() );
//延时30秒执行 scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound,
//线程任务 new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
TimedSupervisorTask
从整体上看,TimedSupervisorTask是固定间隔的周期性任务,一旦遇到超时就会将下一个周期的间隔时间调大,如果连续超时,那么每次间隔时间都会增大一倍,一直到达外部参数设定的上限为止,一旦新任务不再超时,间隔时间又会自动恢复为初始值。 一句话牛批思想
@Override public void run() {
Future<?> future = null; try {
//使用Future,可以设定子线程的超时时间,这样当前线程就不用无限等待了 future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount());
//指定等待子线程的最长时间 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
//delay是个很有用的变量,后面会用到,这里记得每次执行任务成功都会将delay重置 delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get();
//任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间 long newDelay = Math.min(maxDelay, currentDelay * 2);
//设置为最新的值,考虑到多线程,所以用CAS delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) {
//一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略,就会将调度器停掉 if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) {
//一旦出现未知的异常,就会停掉调度器 if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally {
//这里任务要么执行完成,要么发生异常,都用cancel方法来清理任务 if (future != null) { future.cancel(true); } //只要调度器没有停止,就再指定等待时间之后在执行一次同样的任务 if (!scheduler.isShutdown()) {
//这里就是周期性任务的原因:只要没有停止调度器,就再创建一次性任务,执行时间是dealy值
//假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为50秒(构造方法的入参expBackOffBound)
//如果最近一次任务没有超时,那么就在30秒后开始新任务
//如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个*2的操作,*2后的60秒超过了最大间隔50秒) scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
回退一步进入点击CacheRefreshThread然后看他的refreshRegistry方法,这段代码主要两个逻辑
- 判断remoteRegions是否发生了变化
- 调用fetchRegistry获取本地服务地址缓存
@VisibleForTesting void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; //如果部署在aws环境上,会判断最后一次远程区域更新的信息和当前远程区域信息进行比 较,如果不想等,则更新 String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { //判断最后一次 }
//从缓存中获取地址 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } // 省略 } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } }
点击上图中的fetchRegistry看做了啥
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications // 取出本地缓存的服务列表信息 Applications applications = getApplications(); //判断多个条件,确定是否触发全量更新,如下任一个满足都会全量更新: //1. 是否禁用增量更新; //2. 是否对某个region特别关注; //3. 外部调用时是否通过入参指定全量更新; //4. 本地还未缓存有效的服务列表信息; if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { //调用全量更新 getAndStoreFullRegistry(); } else { //调用增量更新 getAndUpdateDelta(applications); } //重新计算和设置一致性hash码 applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); //日志打印所有应用的所有实例数之和 } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } //将本地缓存更新的事件广播给所有已注册的监听器,注意该方法已被CloudEurekaClient类重写 onCacheRefreshed(); // Update remote status based on refreshed data held in the cache //检查刚刚更新的缓存中,有来自Eureka server的服务列表,其中包含了当前应用的状态, //当前实例的成员变量lastRemoteInstanceStatus,记录的是最后一次更新的当前应用状态, //上述两种状态在updateInstanceRemoteStatus方法中作比较 ,如果不一致,就更新 lastRemoteInstanceStatus,并且广播对应的事件 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }
DiscoveryClient.getAndStoreFullRegistry
从eureka server端获取服务注册中心的地址信息,然后更新并设置到本地缓存 localRegionApps 。
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddre ss(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
十、服务端查询服务地址流程
客户端发起服务地址的查询有两种,一种是全量、另一种是增量。对于全量查询请求,会调用Eureka-server的ApplicationsResource的getContainers方法。而对于增量请求,会调用ApplicationsResource.getContainerDifferential。
ApplicationsResource.getContainers接收客户端发送的获取全量注册信息请求。
@GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // EurekaServer无法提供服务,返回403 if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON;// 设置返回数据格式,默认JSON String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { // 如果接收到的请求头部没有具体格式信息,则返回格式为XML keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } // 构建缓存键 Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); // 返回不同的编码类型的数据,去缓存中取数据的方法基本一致 Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { response = Response.ok(responseCache.get(cacheKey)) .build(); } CurrentRequestVersion.remove(); return response; }
responseCache.getGZIP从缓存中读取数据。
public byte[] getGZIP(Key key) { Value payload = getValue(key, shouldUseReadOnlyResponseCache); if (payload == null) { return null; } return payload.getGzipped(); } Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; }