Eureka 系列(04)客户端源码分析
0. Spring Cloud 系列目录 - Eureka 篇
在上一篇 Eureka 系列(01)最简使用姿态 中对 Eureka 的简单用法做了一个讲解,本节分析一下 EurekaClient 的实现 DiscoveryClient。本文的源码是基于 Eureka-1.9.8。
1)服务注册(发送注册请求到注册中心)
2)服务发现(本质就是获取调用服务名所对应的服务提供者实例信息,包括IP、port等)
3)服务续约(本质就是发送当前应用的心跳请求到注册中心)
4)服务下线(本质就是发送取消注册的HTTP请求到注册中心)
1. DiscoveryClient 基本功能简介
总结: DiscoveryClient 构造时会初始化一个 scheduler 定时任务调度器,两个线程池 heartbeatExecutor 和 cacheRefreshExecutor,分别执行 CacheRefreshThread 和 HeartbeatThread 定时任务,前者定时(默认 30s)从 Eureka Server 更新服务列表,后者定时(默认 30s)上报心跳。
1.1 DiscoveryClient 初始化
DiscoveryClient 初始化最核心就是:一是服务发现定时任务,二是心跳发送定时任务。
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// default size of 2 - 1 each for heartbeat and cacheRefresh
// 1. scheduler 是 CacheRefreshThread 和 HeartbeatThread 任务调度器
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 2. 执行 HeartbeatThread 线程池,定时发送心跳
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 3. 执行 CacheRefreshThread 线程池,定时刷新服务列表
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 4. Eureka Server 服务端,用于 HTTP 通信
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
...
// 5. 启动定时任务
initScheduledTasks();
}
总结: DiscoveryClient 代码有删减,只保留了最核心的功能,从上面的代码来看还是很简单的。下面再看一下 initScheduledTasks 干了些什么。至于每 4 步装配 Http Client 会在每 5 小章具体讲解。
1.2 initScheduledTasks 启动定时任务
initScheduledTasks 启动了以下几个任务:一每 30s 同步一次服务列表;二每 30s 发送一次心跳信息;三是如果当前 InstanceInfo 发生变更,同步到 Eureka Server,默认 40s。
private void initScheduledTasks() {
// 1. 定时刷新服务列表,服务发现,默认 30s
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 2. 定时发送心跳,默认 30s
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this, instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 3. 监听 instance 状态
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 4. 定时同步当前 Eureka Client 信息(变更时)给 Eureka Server,默认 40s
instanceInfoReplicator.start(
clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}
}
总结: Eureka DiscoveryClient 通过 CacheRefreshThread 和 HeartbeatThread 这两个定时任务保证的服务的有效性。
2. 服务注册与下线
总结: 服务的注册与下线 OPEN API:
- 服务注册(POST):
http://{ip}:{port}/eureka/apps/{appName}
- 服务下线(DELETE):
http://{ip}:{port}/eureka/apps/{appName}/{id}
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
throw e;
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
3. 服务发现
总结: 服务发现默认每 30s 同步一次数据,更新到本地缓存 localRegionApps
中。数据同步分两种情况:
-
全量同步(GET):
http://{ip}:{port}/eureka/apps/
,参数是 regions。这个 API 会获取该 regions 下的全部服务实例 InstanceInfo,如果实例数很多会对网络造成压力,最好是按需要拉取,即 Client 需要订阅那个服务就返回那个服务的实例。全量同步很简单,getAndStoreFullRegistry 方法调用上述 API,获取全量的 Applications 数据,直接设置给本地缓存 localRegionApps 即可。
-
增量同步(GET):
http://{ip}:{port}/eureka/apps/delta
,参数是 regions。返回发生变化的服务实例,eg: ADDED、MODIFIED、DELETED。增量同步比全量同步要麻烦一些,getAndUpdateDelta 调用上述 API 返回发生变化的服务实例信息,与本地缓存 localRegionApps 进行对比,更新本地缓存。
思考: 增量同步失败,返回数据为空,或者由于网络等原因导致本地缓存和 Eureka Server 无法通过增量同步保持数据一致性时怎么办?
DiscoveryClient 在进行增量同步时,有对应的补偿机制,当增量同步失败时回滚到全量同步。那如何判断本地缓存和服务端数据不一致呢?Eureka DiscoveryClient 通过计算本地缓存和服务端的 hashcode,如果出现不一致的情况,则同样回滚到全量同步。
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
Applications applications = getApplications();
// 1. 全量同步,基本上除了配置选项,第一次同步时全量同步,之后增量同步
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) {
getAndStoreFullRegistry();
// 2. 增量同步
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
return false;
} finally {
}
// 发布事件CacheRefreshedEvent,同时更新状态
onCacheRefreshed();
updateInstanceRemoteStatus();
return true;
}
总结: 参数 forceFullRegistryFetch 表示强制全量同步。除了配置选项,基本第一次同步是全量同步,之后都增量同步。全量同步很简单就不看了,看一下增量同步是怎么做的?
private void getAndUpdateDelta(Applications applications) throws Throwable {
// 1. 通过增量同步,获取改变的服务实例列表
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
// 2. 增量同步失败,回滚到全量同步
if (delta == null) {
getAndStoreFullRegistry();
// 3. 增量同步,对比本地缓存和delta信息,更新本地缓存
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 3. 增量同步,对比本地缓存和delta信息,更新本地缓存
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
}
// 4. 由于未知原因导致实例数不一致(此时hashcode会不一致)
// 无法通过增量同步,回滚到全量同步
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
}
}
总结: 增量同步考虑到了增量同步失败或数据出现不一致的情况,进行了补偿。其实这种补偿机制也很简单,以后做设计时可以考虑这种补偿机制,提高代码的健壮性。
4. 心跳检测
总结: 健康检测,一般都是 TTL(Time To Live) 机制。eg: 客户端每 5s 发送心跳,服务端 15s 没收到心跳包,更新实例状态为不健康, 30s 未收到心跳包,从服务列表中删除。 Eureka Server 是每 30s 发送心跳包,90s 未收心跳则删除。
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
// 1. 发送心跳包
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
// 2. 如果服务端实例不存在,则重新注册实例
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
return false;
}
}
5. 高可用客户端(HA Client)
高可用客户端(HA Client)多用于生产环境,客户端应用关联或配置注册中心服务器集群,避免注册中心单点故障。
常见配置手段:①多注册中心主机;②注册中心 DNS;③广播
如果 Eureka 客户端应用配置多个 Eureka 注册服务器,那么默认情况只有第一台可用的服务器,存在注册信息。如果第一台可用的 Eureka 服务器 Down 掉了,那么 Eureka 客户端应用将会选择下台可用的 Eureka 服务器。
客户端配置如下:
eureka.client.service-url.defaultZone=
http://peer1:10001/eureka,http://peer2:10001/eureka
思考: 那 Eureka Client 到底是访问那台 Eureka Server 呢?如果其中一台 Eureka Server 宕机后怎么处理呢?
5.1 EurekaHttpClient 初始化
DiscoveryClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// 4. Eureka Server 服务端,用于 HTTP 通信
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
...
}
scheduleServerEndpointTask 最终初始化 EurekaTransport,EurekaTransport 最重要的属性有两个:一是 ClosableResolver,用于 Eureka Server 发现;二是 EurekaHttpClient 用于与 Eureka Server 通信。
private static final class EurekaTransport {
// Eureka Server 地址发现
private ClosableResolver bootstrapResolver;
private TransportClientFactory transportClientFactory;
// Eureka 注册
private EurekaHttpClient registrationClient;
private EurekaHttpClientFactory registrationClientFactory;
// Eureka 查询
private EurekaHttpClient queryClient;
private EurekaHttpClientFactory queryClientFactory;
}
总结: scheduleServerEndpointTask 的方法很长,我们只看最核心的代码,即 bootstrapResolver 和 queryClient 的初始化。
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
AbstractDiscoveryClientOptionalArgs args) {
// 1. ClusterResolver#getClusterEndpoints 可以获取所的 endpoints
eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
clientConfig,
transportConfig,
eurekaTransport.transportClientFactory,
applicationInfoManager.getInfo(),
applicationsSource
);
// 2. 初始化 queryClient,默认实现是 RetryableEurekaHttpClient
// registrationClient 初始化类似,就省略了
if (clientConfig.shouldFetchRegistry()) {
EurekaHttpClientFactory newQueryClientFactory = null;
EurekaHttpClient newQueryClient = null;
try {
newQueryClientFactory = EurekaHttpClients.queryClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
clientConfig,
transportConfig,
applicationInfoManager.getInfo(),
applicationsSource
);
newQueryClient = newQueryClientFactory.newClient();
} catch (Exception e) {
}
eurekaTransport.queryClientFactory = newQueryClientFactory;
eurekaTransport.queryClient = newQueryClient;
}
...
}
总结: scheduleServerEndpointTask 方法是重要的工作:
-
一是初始化 ClusterResolver,用于获取所有的 Eureka Server。默认实现是 ConfigClusterResolver,调用 EurekaClientConfig#getEurekaServerServiceUrls() 方法获取配置的 Eureka 地址。
-
二是初始化 EurekaHttpClient,用于发送请求。RetryableEurekaHttpClient 会通过轮询的方式 Eureka Server。需要注意的是:只有第一台宕机时,才会轮询,否则正常情况下永远只访问第一台。
-
EurekaHttpClients.queryClientFactory 创建 EurekaHttpClient 的责任链。默认情况下:
graph LR SessionedEurekaHttpClient --> RetryableEurekaHttpClient RetryableEurekaHttpClient --> RedirectingEurekaHttpClient RedirectingEurekaHttpClient --> JerseyApplicationClient其中最终用于发送请求的 JerseyApplicationClient,由
eurekaTransport.transportClientFactory.newClient()
构建。默认实现如下:graph LR Jersey1TransportClientFactories -- newTransportClientFactory --> JerseyEurekaHttpClientFactory JerseyEurekaHttpClientFactory --newClient --> JerseyApplicationClient
5.2 EurekaHttpClient 执行流程
RetryableEurekaHttpClient 通过 ConfigClusterResolver 解析获取所有配置的 Eureka ServerUrls,默认只会调用每一台 Eureka Server,只有当第一台宕机时才会调用下一台。 也就是通过 EurekaClientConfig#getEurekaServerServiceUrls 获取 eureka.client.service-url.defaultZone=http://peer1:10001/eureka,http://peer2:10001/eureka
配置的集群地址。
总结: RetryableEurekaHttpClient 通过轮询的方式保证客户端的高可用,主要的执行流程分三步:
- 获取所有的 Eureka Server。ConfigClusterResolver 获取所有的地址后,通过轮询算法选择一台 Server。
- 根据这个 Server 构建一个真实发送请求的 EurekaHttpClient。
- EurekaHttpClient 发送请求,如果失败则重试。
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
// 获取所有的 Endpoint
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
// 2. 轮询获取 currentEndpoint,注意只有每一台无法访问时才会访问下一台
// currentHttpClient 才是真实发送请求的 EurekaHttpClient
// 在 spring cloud(sc) 中默认的实现是 RestTemplateEurekaHttpClient
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
// 3. 发送请求,成功则返回,失败则是重试
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
return response;
}
} catch (Exception e) {
}
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
总结: RetryableEurekaHttpClient 通过轮询的方式保证高可用客户端(HA Client)
- RetryableEurekaHttpClient 继承关系:RetryableEurekaHttpClient -> EurekaHttpClientDecorator -> EurekaHttpClient。EurekaHttpClientDecorator 只是一个包装类,具体的发送请求过程都委托给子类 RetryableEurekaHttpClient#execute(EurekaHttpClient delegate) 完成。
- RetryableEurekaHttpClient 通过轮询的方式获取 currentEndpoint,再通过
clientFactory.newClient(currentEndpoint)
构建一个真正用于发送请求的 EurekaHttpClient,在 Spring Cloud(SC) 中的默认实现是 RestTemplateEurekaHttpClient。
6. 总结
6.1 Eureka OPEN API
操作 | 请求方式 | 路径 | 参数 |
---|---|---|---|
注册(register) | POST | apps/{appName} |
instanceInfo |
下线(unregister) | DELETE | apps/{appName}/{id} |
-- |
全量同步(unregister) | GET | apps/ |
regions |
增量同步(unregister) | GET | apps/delta |
regions |
心跳(sendHeartBeat) | PUT | apps/{appName}/{id} |
-- |
6.2 实例注册
Eureka DiscoveryClient 默认延迟 40s 注册实例信息,之后如果实例信息发生变化,则每 30s 同步一次数据。
参数 | 功能 | 默认值 |
---|---|---|
registerWithEureka | 是否将本机实例注册到 Eureka Server 上 | true |
initialInstanceInfoReplicationIntervalSeconds | 初始化注册的延迟时间 | 40s |
instanceInfoReplicationIntervalSeconds | 定时更新本机实例信息到 Eureka Server 的时间间隔 | 30s |
6.3 数据同步 - 服务发现
Eureka DiscoveryClient 默认每 30s 同步一次数据,更新本地缓存 localRegionApps
。数据同步分为全量同步和增量同步。
-
全量同步:获取该 regions 下的全部服务实例 InstanceInfo,如果实例数很多会对网络造成压力,最好是按需要拉取,即 Client 需要订阅那个服务就返回那个服务的实例。
-
增量同步:返回发生变化的服务实例,eg: ADDED、MODIFIED、DELETED。
增量同步比全量同步要麻烦一些,getAndUpdateDelta 调用上述 API 返回发生变化的服务实例信息,与本地缓存 localRegionApps 进行对比,更新本地缓存。如果增量同步失败回滚到全量同步。
表3:Eureka 服务发现配置参数
参数 | 功能 | 默认值 |
---|---|---|
fetchRegistry | 是否从 Eureka Server 获取注册信息 | true |
registryFetchIntervalSeconds | 定时同步本地的服务实例信息缓存的时间间隔 | 30s |
6.4 健康检查 - 心跳机制
Eureka DiscoveryClient 默认每 30s 发送心跳包,Server 如果 90s 未收心跳则删除。
参数 | 功能 | 默认值 | 来源 |
---|---|---|---|
renewalIntervalInSecs | 心跳的时间间隔 | 30s | LeaseInfo |
durationInSecs | 定时同步本地的服务实例信息缓存的时间间隔 | 90s | LeaseInfo |
6.5 思考
- 当注册应用之间存在相互关联时,那么上层应用如何感知下层服务的存在?
- 如果上层应用感知到下层服务,那么它是怎么同步下层服务信息?
- 如果应用需要实时地同步信息,那么确保一致性?
每天用心记录一点点。内容也许不重要,但习惯很重要!