https://www.jianshu.com/p/84d0b7eea882
eureka注册原理:
注册地址:POST 对象com.netflix.appinfo.InstanceInfo
http://localhost:8761/eureka/apps/HELLO-SERVICE
{Accept-Encoding=[gzip], Content-Type=[application/json], Accept=[application/json], DiscoveryIdentity-Name=[DefaultClient], DiscoveryIdentity-Version=[1.4], DiscoveryIdentity-Id=[192.168.1.106]}
AbstractJerseyEurekaHttpClient.register
ApacheHttpClient4Handler.handle
HttpRequestExecutor.doSendRequest
org.springframework.cloud.netflix.eureka.serviceregistry.EurekaAutoServiceRegistration.start-->this.serviceRegistry.register(this.registration);
org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry.register-->// 通过应用信息管理器去设置实例的状态 reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); 取消注册deregister
com.netflix.appinfo.ApplicationInfoManager.setInstanceStatus(InstanceStatus status)-->
com.netflix.discovery.DiscoveryClient构造器里:
// 这里是重点,开始定义定时任务调度器 scheduler try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 定义心跳检测执行器,线程池核心线程数1个,最大线程数为配置参数HeartbeatExecutorThreadPoolSize heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 定义缓存刷新执行器,线程池核心线程数1个,最大线程数为配置参数CacheRefreshExecutorThreadPoolSize的值 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 初始化一个eureka请求传输器 eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } // 注意这里如果开启初始化强制注册开关的话,会直接注册到 // eureka server, if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // 重点是这里,初始化定时任务调度器 initScheduledTasks();
com.netflix.discovery.DiscoveryClient.initScheduledTasks()
// 初始化定时任务调度器 private void initScheduledTasks() { // 配置开启获取注册表信息开关,则调度器去调度CacheRefreshThread 任务,调度频率为registryFetchIntervalSeconds,首次延迟时间也是registryFetchIntervalSeconds ,即默认30秒 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } // 配置开启是否注册到server的开关,则调度器去调度HeartbeatThread 任务,调度频率为renewalIntervalInSecs,首次延迟时间也是renewalIntervalInSecs ,即默认30秒 if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // 实例信息复制定时器 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // 创建实例状态变化监听器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; // 注意这里是开启了在需要的时候更新状态变化的开关才会添加监听器,此处当开关开启时,当状态发生变化,会立即收到通知,即调用onDemandUpdate方法 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 启动周期性实例信息复制到远程定时器,默认延迟40秒执行 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); }
com.netflix.discovery.InstanceInfoReplicator.run()-->discoveryClient.register();
com.netflix.discovery.DiscoveryClient.register()-->httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
AbstractJerseyEurekaHttpClient.register--> response = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);
com.sun.jersey.api.client.WebResource.Builder.post(Class<T> c, Object requestEntity)
Daemon Thread [DiscoveryClient-HeartbeatExecutor-1] (Suspended (breakpoint at line 48 in AbstractJerseyEurekaHttpClient))
JerseyApplicationClient(AbstractJerseyEurekaHttpClient).register(InstanceInfo) line: 48
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
MetricsCollectingEurekaHttpClient.execute(RequestExecutor<R>) line: 73
MetricsCollectingEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
RedirectingEurekaHttpClient.executeOnNewServer(RequestExecutor<R>, AtomicReference<EurekaHttpClient>) line: 118
RedirectingEurekaHttpClient.execute(RequestExecutor<R>) line: 79
RedirectingEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
RetryableEurekaHttpClient.execute(RequestExecutor<R>) line: 119
RetryableEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
SessionedEurekaHttpClient.execute(RequestExecutor<R>) line: 77
SessionedEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
CloudEurekaClient(DiscoveryClient).register() line: 815
CloudEurekaClient(DiscoveryClient).renew() line: 837
DiscoveryClient$HeartbeatThread.run() line: 1396
Executors$RunnableAdapter<T>.call() line: 511
FutureTask<V>.run() line: 266
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149
ThreadPoolExecutor$Worker.run() line: 624
Thread.run() line: 748
Daemon Thread [DiscoveryClient-InstanceInfoReplicator-0] (Suspended)
HttpRequestExecutor.doSendRequest(HttpRequest, HttpClientConnection, HttpContext) line: 238
HttpRequestExecutor.execute(HttpRequest, HttpClientConnection, HttpContext) line: 123
DefaultRequestDirector.tryExecute(RoutedRequest, HttpContext) line: 686
DefaultRequestDirector.execute(HttpHost, HttpRequest, HttpContext) line: 488
DefaultHttpClient(AbstractHttpClient).doExecute(HttpHost, HttpRequest, HttpContext) line: 884
DefaultHttpClient(CloseableHttpClient).execute(HttpHost, HttpRequest) line: 117
DefaultHttpClient(CloseableHttpClient).execute(HttpHost, HttpRequest) line: 55
ApacheHttpClient4Handler.handle(ClientRequest) line: 173
GZIPContentEncodingFilter.handle(ClientRequest) line: 123
EurekaIdentityHeaderFilter.handle(ClientRequest) line: 27
ApacheHttpClient4(Client).handle(ClientRequest) line: 652
WebResource.handle(Class<T>, ClientRequest) line: 682
WebResource.access$200(WebResource, Class, ClientRequest) line: 74
WebResource$Builder.post(Class<T>, Object) line: 570
JerseyApplicationClient(AbstractJerseyEurekaHttpClient).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
MetricsCollectingEurekaHttpClient.execute(RequestExecutor<R>) line: 73
MetricsCollectingEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
RedirectingEurekaHttpClient.executeOnNewServer(RequestExecutor<R>, AtomicReference<EurekaHttpClient>) line: 118
RedirectingEurekaHttpClient.execute(RequestExecutor<R>) line: 79
RedirectingEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
RetryableEurekaHttpClient.execute(RequestExecutor<R>) line: 119
RetryableEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
SessionedEurekaHttpClient.execute(RequestExecutor<R>) line: 77
SessionedEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
CloudEurekaClient(DiscoveryClient).register() line: 815
InstanceInfoReplicator.run() line: 104
InstanceInfoReplicator$1.run() line: 88
Executors$RunnableAdapter<T>.call() line: 511
ScheduledThreadPoolExecutor$ScheduledFutureTask<V>(FutureTask<V>).run() line: 266
ScheduledThreadPoolExecutor$ScheduledFutureTask<V>.access$201(ScheduledThreadPoolExecutor$ScheduledFutureTask) line: 180
ScheduledThreadPoolExecutor$ScheduledFutureTask<V>.run() line: 293
ScheduledThreadPoolExecutor(ThreadPoolExecutor).runWorker(ThreadPoolExecutor$Worker) line: 1149
ThreadPoolExecutor$Worker.run() line: 624
Thread.run() line: 748
Daemon Thread [DiscoveryClient-InstanceInfoReplicator-0] (Suspended)
EurekaJacksonCodec.writeTo(T, OutputStream) line: 191
CodecWrappers$LegacyJacksonJson.encode(T, OutputStream) line: 304
DiscoveryJerseyProvider.writeTo(Object, Class, Type, Annotation[], MediaType, MultivaluedMap, OutputStream) line: 135
RequestWriter$RequestEntityWriterImpl.writeRequestEntity(OutputStream) line: 231
ApacheHttpClient4Handler$2.writeTo(OutputStream) line: 289
EntityEnclosingRequestWrapper$EntityWrapper(HttpEntityWrapper).writeTo(OutputStream) line: 94
EntityEnclosingRequestWrapper$EntityWrapper.writeTo(OutputStream) line: 112
EntitySerializer.serialize(SessionOutputBuffer, HttpMessage, HttpEntity) line: 118
DefaultClientConnection(AbstractHttpClientConnection).sendRequestEntity(HttpEntityEnclosingRequest) line: 263
BasicPooledConnAdapter(AbstractClientConnAdapter).sendRequestEntity(HttpEntityEnclosingRequest) line: 241
HttpRequestExecutor.doSendRequest(HttpRequest, HttpClientConnection, HttpContext) line: 238
HttpRequestExecutor.execute(HttpRequest, HttpClientConnection, HttpContext) line: 123
DefaultRequestDirector.tryExecute(RoutedRequest, HttpContext) line: 686
DefaultRequestDirector.execute(HttpHost, HttpRequest, HttpContext) line: 488
DefaultHttpClient(AbstractHttpClient).doExecute(HttpHost, HttpRequest, HttpContext) line: 884
DefaultHttpClient(CloseableHttpClient).execute(HttpHost, HttpRequest) line: 117
DefaultHttpClient(CloseableHttpClient).execute(HttpHost, HttpRequest) line: 55
ApacheHttpClient4Handler.handle(ClientRequest) line: 173
GZIPContentEncodingFilter.handle(ClientRequest) line: 123
EurekaIdentityHeaderFilter.handle(ClientRequest) line: 27
ApacheHttpClient4(Client).handle(ClientRequest) line: 652
WebResource.handle(Class<T>, ClientRequest) line: 682
WebResource.access$200(WebResource, Class, ClientRequest) line: 74
WebResource$Builder.post(Class<T>, Object) line: 570
JerseyApplicationClient(AbstractJerseyEurekaHttpClient).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
MetricsCollectingEurekaHttpClient.execute(RequestExecutor<R>) line: 73
MetricsCollectingEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
RedirectingEurekaHttpClient.executeOnNewServer(RequestExecutor<R>, AtomicReference<EurekaHttpClient>) line: 118
RedirectingEurekaHttpClient.execute(RequestExecutor<R>) line: 79
RedirectingEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
RetryableEurekaHttpClient.execute(RequestExecutor<R>) line: 119
RetryableEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
EurekaHttpClientDecorator$1.execute(EurekaHttpClient) line: 59
SessionedEurekaHttpClient.execute(RequestExecutor<R>) line: 77
SessionedEurekaHttpClient(EurekaHttpClientDecorator).register(InstanceInfo) line: 56
CloudEurekaClient(DiscoveryClient).register() line: 815
InstanceInfoReplicator.run() line: 104
InstanceInfoReplicator$1.run() line: 88
Executors$RunnableAdapter<T>.call() line: 511
ScheduledThreadPoolExecutor$ScheduledFutureTask<V>(FutureTask<V>).run() line: 266
ScheduledThreadPoolExecutor$ScheduledFutureTask<V>.access$201(ScheduledThreadPoolExecutor$ScheduledFutureTask) line: 180
ScheduledThreadPoolExecutor$ScheduledFutureTask<V>.run() line: 293
ScheduledThreadPoolExecutor(ThreadPoolExecutor).runWorker(ThreadPoolExecutor$Worker) line: 1149
ThreadPoolExecutor$Worker.run() line: 624
Thread.run() line: 748