Eureka的客户端是怎么启动的?
由于eureka的每一个模块下面都写了很多的单元测试,我们可以拿它当学习的入口。下面找到eureka-client的测试类或者样例类,然后断点进去。
它的样例类为ExampleEurekaClient
public static void main(String[] args) throws UnknownHostException {
//先创建一个sampleClient
ExampleEurekaClient sampleClient = new ExampleEurekaClient();
//把eureka的客户端配置放到系统变量中
injectEurekaConfiguration();
// 构造服务实例,创建是服务实例管理器
ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager(new MyDataCenterInstanceConfig());
EurekaClient client = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());
// use the client
sampleClient.sendRequestToServiceUsingEureka(client);
// shutdown the client
eurekaClient.shutdown();
}
injectEurekaConfiguration()方法中放的都是客户端的配置,它的效果和读取eureka-client.properties效果是一样的。
private static void injectEurekaConfiguration() throws UnknownHostException {
String myHostName = InetAddress.getLocalHost().getHostName();
String myServiceUrl = "http://" + myHostName + ":8080/v2/";
System.setProperty("eureka.region", "default");
System.setProperty("eureka.name", "eureka");
System.setProperty("eureka.vipAddress", "eureka.mydomain.net");
System.setProperty("eureka.port", "8080");
System.setProperty("eureka.preferSameZone", "false");
System.setProperty("eureka.shouldUseDns", "false");
System.setProperty("eureka.shouldFetchRegistry", "true");
System.setProperty("eureka.serviceUrl.defaultZone", myServiceUrl);
System.setProperty("eureka.serviceUrl.default.defaultZone", myServiceUrl);
System.setProperty("eureka.awsAccessId", "fake_aws_access_id");
System.setProperty("eureka.awsSecretKey", "fake_aws_secret_key");
System.setProperty("eureka.numberRegistrySyncRetries", "0");
}
构造服务实例的代码逻辑
private static synchronized ApplicationInfoManager initializeApplicationInfoManager(EurekaInstanceConfig instanceConfig) {
if (applicationInfoManager == null) {
InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get();
applicationInfoManager = new ApplicationInfoManager(instanceConfig, instanceInfo);
}
return applicationInfoManager;
}
根据上面的代码会看到有2个新的类,EurekaConfigBasedInstanceInfoProvider
,基于 EurekaInstanceConfig 创建 InstanceInfo 的工厂,实现代码如下:
@Singleton
public class EurekaConfigBasedInstanceInfoProvider implements Provider<InstanceInfo> {
private static final Logger LOG = LoggerFactory.getLogger(EurekaConfigBasedInstanceInfoProvider.class);
private final EurekaInstanceConfig config;
private InstanceInfo instanceInfo;
@Inject(optional = true)
private VipAddressResolver vipAddressResolver = null;
@Inject
public EurekaConfigBasedInstanceInfoProvider(EurekaInstanceConfig config) {
this.config = config;
}
@Override
public synchronized InstanceInfo get() {
if (instanceInfo == null) {
//创建 租约信息构建器,并设置属性
LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
// 创建 VIP地址解析器
if (vipAddressResolver == null) {
vipAddressResolver = new Archaius1VipAddressResolver();
}
// 这里使用了构造器模式
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver);
// set the appropriate id for the InstanceInfo, falling back to datacenter Id if applicable, else hostname
String instanceId = config.getInstanceId();
if (instanceId == null || instanceId.isEmpty()) {
DataCenterInfo dataCenterInfo = config.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
instanceId = ((UniqueIdentifier) dataCenterInfo).getId();
} else {
instanceId = config.getHostName(false);
}
}
String defaultAddress;
if (config instanceof RefreshableInstanceConfig) {
// Refresh AWS data center info, and return up to date address
defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(false);
} else {
defaultAddress = config.getHostName(false);
}
// fail safe
if (defaultAddress == null || defaultAddress.isEmpty()) {
defaultAddress = config.getIpAddress();
}
// 设置 应用实例信息构建器 的 属性
builder.setNamespace(config.getNamespace())
.setInstanceId(instanceId)
.setAppName(config.getAppname())
.setAppGroupName(config.getAppGroupName())
.setDataCenterInfo(config.getDataCenterInfo())
.setIPAddr(config.getIpAddress())
.setHostName(defaultAddress)
.setPort(config.getNonSecurePort())
.enablePort(PortType.UNSECURE, config.isNonSecurePortEnabled())
.setSecurePort(config.getSecurePort())
.enablePort(PortType.SECURE, config.getSecurePortEnabled())
.setVIPAddress(config.getVirtualHostName())
.setSecureVIPAddress(config.getSecureVirtualHostName())
.setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
.setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
.setASGName(config.getASGName())
.setHealthCheckUrls(config.getHealthCheckUrlPath(),
config.getHealthCheckUrl(), config.getSecureHealthCheckUrl());
// Start off with the STARTING state to avoid traffic
if (!config.isInstanceEnabledOnit()) {
InstanceStatus initialStatus = InstanceStatus.STARTING;
LOG.info("Setting initial instance status as: {}", initialStatus);
builder.setStatus(initialStatus);
} else {
LOG.info("Setting initial instance status as: {}. This may be too early for the instance to advertise "
+ "itself as available. You would instead want to control this via a healthcheck handler.",
InstanceStatus.UP);
}
// Add any user-specific metadata information
for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
String key = mapEntry.getKey();
String value = mapEntry.getValue();
// only add the metadata if the value is present
if (value != null && !value.isEmpty()) {
builder.add(key, value);
}
}
//创建应用信息
instanceInfo = builder.build();
// 设置 应用实例信息 的 租约信息
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
}
return instanceInfo;
}
}
com.netflix.appinfo.ApplicationInfoManager
,应用信息管理器。实现代码如下:
public class ApplicationInfoManager {
/**
* 单例
*/
private static ApplicationInfoManager instance = new ApplicationInfoManager(null, null, null);
/**
* 状态变更监听器
*/
protected final Map<String, StatusChangeListener> listeners;
/**
* 应用实例状态匹配
*/
private final InstanceStatusMapper instanceStatusMapper;
/**
* 应用实例信息
*/
private InstanceInfo instanceInfo;
/**
* 应用实例配置
*/
private EurekaInstanceConfig config;
// ... 省略其它构造方法
public ApplicationInfoManager(EurekaInstanceConfig config, InstanceInfo instanceInfo, OptionalArgs optionalArgs) {
this.config = config;
this.instanceInfo = instanceInfo;
this.listeners = new ConcurrentHashMap<String, StatusChangeListener>();
if (optionalArgs != null) {
this.instanceStatusMapper = optionalArgs.getInstanceStatusMapper();
} else {
this.instanceStatusMapper = NO_OP_MAPPER;
}
// Hack to allow for getInstance() to use the DI'd ApplicationInfoManager
instance = this;
}
// ... 省略其它方法
}
构建的实例信息如下图:
在initializeEurekaClient,在这个方法中,顾名思义,会初始化一个eureka的客户端,在new DefaultEurekaClientConfig()中读取一个客户端的配置信息
EurekaClient client = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());
private static synchronized EurekaClient initializeEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig clientConfig) {
if (eurekaClient == null) {
eurekaClient = new DiscoveryClient(applicationInfoManager, clientConfig);
}
return eurekaClient;
}
读取客户端配置信息
public DefaultEurekaClientConfig() {
this(CommonConstants.DEFAULT_CONFIG_NAMESPACE);
}
public DefaultEurekaClientConfig(String namespace) {
this.namespace = namespace.endsWith(".")
? namespace
: namespace + ".";
this.configInstance = Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME);
this.transportConfig = new DefaultEurekaTransportConfig(namespace, configInstance);
}
接着就开始创建DiscoveryClient,
// DiscoveryClient.java 构造方法
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
// DiscoveryClient.java 构造方法
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); // 无实际业务用途,用于打 logger
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
初始化本地缓存
// DiscoveryClient.java 变量
/**
* Applications 在本地的缓存
*/
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
/**
* 拉取注册信息次数
* monotonically increasing generation counter to ensure stale threads do not reset registry to an older version
*/
private final AtomicLong fetchRegistryGeneration;
// DiscoveryClient.java 构造方法
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
后面初始化定时调度的时候,会拉取注册表,会缓存在本地
初始化拉取、心跳的监控
// DiscoveryClient.java 变量
/**
* 最后成功从 Eureka-Server 拉取注册信息时间戳
*/
private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
/**
* 最后成功向 Eureka-Server 心跳时间戳
*/
private volatile long lastSuccessfulHeartbeatTimestamp = -1;
/**
* 心跳监控
*/
private final ThresholdLevelsMetric heartbeatStalenessMonitor;
/**
* 拉取监控
*/
private final ThresholdLevelsMetric registryStalenessMonitor;
// DiscoveryClient.java 构造方法
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
- 每次成功向 Eureka-Serve 心跳或者从从 Eureka-Server 拉取注册信息后,都会更新相应时间戳。
判断是否要抓取注册表信息,刚初始化的时候,是手动抓取,抓取注册表专门在写一篇笔记
if (clientConfig.shouldFetchRegistry()) {
try {
//全量抓取註冊表
boolean primaryFetchRegistryResult = fetchRegistry(false);
if (!primaryFetchRegistryResult) {
logger.info("Initial registry fetch from primary servers failed");
}
boolean backupFetchRegistryResult = true;
if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
backupFetchRegistryResult = false;
logger.info("Initial registry fetch from backup servers failed");
}
if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
}
} catch (Throwable th) {
logger.error("Fetch registry error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
接着会专门起几个调度,进行抓取增量注册表和心跳检查,后面也专门的写一篇笔记记录一下
// 这里又一个初始化调度
initScheduledTasks();
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); //30
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs, //public static final int DEFAULT_LEASE_RENEWAL_INTERVAL = 30;
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
logger.error("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
- 增量抓取的时间是默认30s,
clientConfig.getRegistryFetchIntervalSeconds();
- 每隔30秒去发送一次心跳,
DEFAULT_LEASE_RENEWAL_INTERVAL = 30;
初始化 Eureka 网络通信相关
// DiscoveryClient.java 构造方法
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
初始化完成
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);