为了方便说明,就把上篇博客的图再贴一遍了。
上篇说道Application Service向Eureka Server注册服务的过程,在完成注册之后,由于Eureka Server是对等集群,其他Server也需要同步这一注册信息。与zookeeper相比,Eureka并不追求很强的一致性,而是认为A(可用性)和P(分区容错性)更重要。
基于此,Eureka采用复制的方式进行注册信息的同步。
1、在完成注册方法之后,进行复制。
PeerAwareInstanceRegistryImpl类
public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
2、具体的复制步骤
复制方法,复制eureka的所有action操作除了流量。 private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // 如果已经复制过,就不再复制 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } //遍历eureka集群中的所有节点,进行复制操作 for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
Eureka Server的复制不会进行第二次,可以参考上面的代码。
判断isReplication的值,如果复制过,则不再复制。如果没有复制过,遍历集群中的node节点个数(即Eureka Server集群),依次进行复制操作。
具体的复制action,包括取消、注册、心跳、状态更新等。
private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
peerEurekaNodes如何获取集群中的所有node节点?
首先,在PeerAwareInstanceRegistryImpl类中的init方法中找到了peerEurekaNodes的赋值。
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; initializedResponseCache(); scheduleRenewalThresholdUpdateTask(); initRemoteRegionRegistry(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } }
在其父类中AwsInstanceRegistry也有其实现。
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { super.init(peerEurekaNodes); this.awsAsgUtil = new AwsAsgUtil(serverConfig, clientConfig, this); // We first check if the instance is STARTING or DOWN, then we check explicit overrides, // then we see if our ASG is UP, then we check the status of a potentially existing lease. this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(), new OverrideExistsRule(overriddenInstanceStatusMap), new AsgEnabledRule(this.awsAsgUtil), new LeaseExistsRule()); }
继续往上找,DefaultEurekaServerContext类中,
@PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); peerEurekaNodes.start(); registry.init(peerEurekaNodes); logger.info("Initialized"); }
最后,EurekaBootStrap中进行了调用。
protected void initEurekaServerContext() throws Exception { //省略配置信息 logger.info("Initializing the eureka client..."); //创建node组 PeerEurekaNodes peerEurekaNodes = new PeerEurekaNodes( registry, eurekaServerConfig, eurekaClientConfig, serverCodecs, applicationInfoManager ); serverContext = new DefaultEurekaServerContext( eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, applicationInfoManager ); EurekaServerContextHolder.initialize(serverContext); serverContext.initialize(); logger.info("Initialized server context"); // Copy registry from neighboring eureka node int registryCount = registry.syncUp(); registry.openForTraffic(applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }