zoukankan      html  css  js  c++  java
  • SpringCloud--Eureka--原理及源码解析

    一、Eureka的基础架构及服务治理机制

      Eureka服务治理的基础架构包含三个核心:服务注册中心、服务提供者、服务消费者。其中服务注册中心,即Eureka提供的服务端,提供服务注册和发现的功能;服务提供者,即将自己的服务注册到注册中心;服务的消费者,从注册中心获取服务列表,从而使消费者知道到何处调用服务,服务消费可以使用Ribbon、Feign等。

      1、服务提供者:

        (1)服务注册:服务提供者在项目启动时,会通过发送REST请求的方式将自己注册到eureka server上,同时带上一些自己的元数据,Eureka Server收到请求后,将元数据存储在一个双层map中,第一层的key是服务名称,第二层的key是具体的服务实例。

        (2)服务同步:如果A项目将服务注册到了M注册中心,B项目将服务注册到N注册中心,但是如果M项目和N项目开启了可以注册为服务的配置,那么当A项目将服务注册到M注册中心时,M注册中心会将请求转发到N注册中心,以保证两个注册中心副本中服务同步。

        (3)服务续约:在注册完服务后,服务提供者会维护一个心跳来持续告诉注册中心其还活着,以防止注册中心的剔除任务将该服务实例从服务列表中删除。

        关于心跳频率与剔除任务认为服务失效时间的配置参数如下所示(配置值均为默认值):

    eureka:
      instance:
        # 心跳检测频率
        lease-renewal-interval-in-seconds: 30
        # 服务失效时间
        lease-expiration-duration-in-seconds: 90

      2、服务消费者:

        (1)获取服务:当启动服务消费者项目时,会向注册中心发送一个REST请求来获取注册中心上注册的服务清单。为了性能的考虑,注册中心自己维护了一份只读的注册服务清单,每30秒更新一次,要调整注册中心中注册服务清单更新频率,可以使用如下参数进行设置(下面示例为默认值),同时,由于获取服务是服务消费的基础,因此需要保证eureka.client.fetch-registry为true

    eureka:
      client:
        registry-fetch-interval-seconds: 30
        fetch-registry: true

        (2)服务调用:服务消费者在获取到服务提供清单后,会根据服务名获得具体的实例名和该实例的元数据,然后客户端可以根据自己需要,选择调用哪个实例,在上述代码样例中,我们使用的是Ribbon来做负载均衡,而ribbon默认采用轮询的方式进行调用,从而实现客户端的负载。对于访问实例的选择,Eureka中有Region和Zone的概念,一个Region中可以包含多个Zone,一个客户端会被注册到一个Zone中,所以一个客户端只对应一个Zone和一个Region,在服务调用时,优先访问处于同一个Zone中的服务提供者,若访问不到,再访问其他Zone中的服务提供者。

        (3)服务下线:当客户端实例进行正常的关闭操作时,它会触发一个服务下线的REST请求给注册中心,告诉注册中心其要下线,注册中心收到请求后,将该服务状态置为下线,并把该事件传播出去。

      3、服务注册中心

        (1)失效剔除:有时服务实例并不会正常下线,可能是由于内存溢出、网络故障等原因使得服务不能正常运行,所以注册中心并未收到服务下线的请求。为了剔除该类不可用服务提供者实例,Eureka Server在启动时,会创建一个定时任务,每隔一段时间(默认60秒)将当前清单中超时(默认90秒)没有续约的服务剔除出去。

        (2)自我保护:前面提到过,服务提供者启动后,会维护一个心跳,定时向注册中心发送心跳,告诉注册中心自己还活着。注册中心的运行期间,会统计心跳失败的比例在15分钟内是否低于85%,如果低于85%,注册中心会将该服务的实例保护起来,不让其过期,但是由于在本地测试,所以这个情况非常容易满足(而线上则主要是由于网络不稳定等导致),这就导致在保护期间内,如果服务提供者实例出现问题,那么客户端就会拿到有问题的实例,将会出现调用失败的情况,因此客户端必须要有容错机制,比如说请求重试、断路器等机制。如果我们想关闭自我保护机制,可以使用如下参数。

    eureka:
      server:
        enable-self-preservation: false

    在我们没有关闭自我保护之前,当我们在之前访问注册中心时:http://localhost:1112/,会看到红色警告(警告内容如下图所示),这就是触发了Eureka Server的自我保护机制。

         

    二、Eureka源码分析

      1、服务注册中心的加载

      首先从服务提供者开始看,例如eureka-client项目,我们主要是在主函数上添加了@EnableDiscoveryClient注解,以及在配置文件中添加了注册中心地址等配置信息,那么源码入口就可以从@EnableDiscoveryClient注解开始看,从该注解我们可以了解,主要是开启DiscoveryClient,那么全局搜索DiscoveryClient,可以发现有两个,一个是springCloud提供的接口org.springframework.cloud.client.discovery.DiscoveryClient,另外一个是netflix的实现com.netflix.discovery.DiscoveryClient,可以看下类的依赖关系,SpringCloud提供的接口类依赖关系及 netflix的实现类的类依赖关系分别如以下左右两张图:

     而org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient又使用了com.netflix.discovery.EurekaClient,所以,总的依赖关系如下:

    org.springframework.cloud.client.discovery.DiscoveryClient提供了Springcloud服务注册相关的接口,而org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是netflix公司对于该接口的实现,而该实现,是包装了netflix公司开源项目中的com.netflix.discovery.EurekaClient接口及实现com.netflix.discovery.DiscoveryClient。

      可以看调用链,先调用了com.netflix.discovery.DiscoveryClient#getEurekaServiceUrlsFromConfig方法

        /**
         * @deprecated use {@link #getServiceUrlsFromConfig(String, boolean)} instead.
         */
        @Deprecated
        public static List<String> getEurekaServiceUrlsFromConfig(String instanceZone, boolean preferSameZone) {
            return EndpointUtils.getServiceUrlsFromConfig(staticClientConfig, instanceZone, preferSameZone);
        }

      可以看到该方法已经过期,被使用@link到了替代方法com.netflix.discovery.DiscoveryClient#getServiceUrlsFromConfig

        @Deprecated
        @Override
        public List<String> getServiceUrlsFromConfig(String instanceZone, boolean preferSameZone) {
            return EndpointUtils.getServiceUrlsFromConfig(clientConfig, instanceZone, preferSameZone);
        }

      然后,调用到了com.netflix.discovery.endpoint.EndpointUtils#getServiceUrlsFromConfig方法

        public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
            List<String> orderedUrls = new ArrayList<String>();
            String region = getRegion(clientConfig);
            String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
            if (availZones == null || availZones.length == 0) {
                availZones = new String[1];
                availZones[0] = DEFAULT_ZONE;
            }
            logger.debug("The availability zone for the given region {} are {}", region, availZones);
            int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
    
            List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);
            if (serviceUrls != null) {
                orderedUrls.addAll(serviceUrls);
            }
            int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
            while (currentOffset != myZoneOffset) {
                serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]);
                if (serviceUrls != null) {
                    orderedUrls.addAll(serviceUrls);
                }
                if (currentOffset == (availZones.length - 1)) {
                    currentOffset = 0;
                } else {
                    currentOffset++;
                }
            }
    
            if (orderedUrls.size() < 1) {
                throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
            }
            return orderedUrls;
        }

      该方法依次操作:获取项目对应的Region、获取项目对应的Zones数组(如果没有,则取默认Zone)、计算项目对应Zones数组的偏移量、获取注册中心、计算当前偏移量、根据当前偏移量获取注册中心地址并将地址添加在地址集合中、返回地址集合

      接下来每个步骤单独说明。

      (1)获取项目对应的Region

        public static String getRegion(EurekaClientConfig clientConfig) {
            String region = clientConfig.getRegion();
            if (region == null) {
                region = DEFAULT_REGION;
            }
            region = region.trim().toLowerCase();
            return region;
        }

      这个没什么可说的,就是从配置文件中获取Region的配置,如果没有,则取默认值,最终将Region转换成大写返回,这里可以使用如下参数设置Region

    eureka:
      client:
        region: test-1

      (2)获取项目对应的Zones数组

        public String[] getAvailabilityZones(String region) {
            String value = this.availabilityZones.get(region);
            if (value == null) {
                value = DEFAULT_ZONE;
            }
            return value.split(",");
        }

      根据region获取配置文件中zone数组,如果没有配置,则取默认值(上面使用的eureka.client.service-url.defaultZone即是默认配置),若要指定Zone,可以使用如下配置:

    eureka:
      client:
        #client所在zone为availabilityZones的第一个zone,如果未配置,则为defaultZone
        prefer-same-zone-eureka: true
        region: region1
        availability-zones:
          region1: zone1,zone2,zone3
          region2: zone4,zone5,zone6
        service-url:
          zone1: http://localhost:1111/eureka/
          zone2: http://localhost:1111/eureka/
          zone3: http://localhost:1111/eureka/
          zone4: http://localhost:1112/eureka/
          zone5: http://localhost:1112/eureka/
          zone6: http://localhost:1112/eureka/

      (3)获取注册中心,计算zone数组下标,获得zone,然后根据zone获取注册中心

        public List<String> getEurekaServerServiceUrls(String myZone) {
            String serviceUrls = this.serviceUrl.get(myZone);
            if (serviceUrls == null || serviceUrls.isEmpty()) {
                serviceUrls = this.serviceUrl.get(DEFAULT_ZONE);
            }
            if (!StringUtils.isEmpty(serviceUrls)) {
                final String[] serviceUrlsSplit = StringUtils
                        .commaDelimitedListToStringArray(serviceUrls);
                List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length);
                for (String eurekaServiceUrl : serviceUrlsSplit) {
                    if (!endsWithSlash(eurekaServiceUrl)) {
                        eurekaServiceUrl += "/";
                    }
                    eurekaServiceUrls.add(eurekaServiceUrl.trim());
                }
                return eurekaServiceUrls;
            }
    
            return new ArrayList<>();
        }

      获取zone配置的注册中心,如果没有,则取默认的注册中心,然后使用逗号切割后组装成集合返回。

      当我们在使用Ribbon来实现服务调用时,对于zone的设置可以实现区域亲和性,Ribbon会优先访问属于同一Zone中的服务实例,只有当同一zone中没有可用实例后,才会访问其他zone中的实例。所以通过zone属性的定义,配合实际部署的物理结构,我们就可以有效的设计出针对区域性故障的容错集群。

      2、服务注册

      在com.netflix.discovery.DiscoveryClient构造函数中调用了com.netflix.discovery.DiscoveryClient#initScheduledTasks方法

        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                cacheRefreshTask = new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                );
                scheduler.schedule(
                        cacheRefreshTask,
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                // Heartbeat timer
                heartbeatTask = new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                );
                scheduler.schedule(
                        heartbeatTask,
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }

      对于该方法,依次错了如下操作:

        判断是否开启服务获取,如果开启,创建一个定时任务,定时刷新服务列表

        判断是否开启服务注册,如果开启,添加一个服务续租定时任务;异步注册服务。

      接下来我们一一查看操作:

      (1)判断是否开启服务注册,如果开启,添加一个服务续租定时任务;异步注册服务。

      这个判断使用的参数是上述的eureka.client.register-with-eureka参数,如果配置为true,创建了一个服务续租的定时任务,还创建了一个异步注册的任务。

      a、先看异步注册服务类InstanceInfoReplicator,该类实现了Rnnable接口,因此直接看run方法

        public void run() {
            try {
                discoveryClient.refreshInstanceInfo();
    
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }

    这里注册的,是discoveryClient.register();这一行代码,查看register方法

        boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
        }

    可以看到,这里使用了http请求将元数据InstanceInfo请求到注册中心。

      b、然后看服务续租定时任务

      该任务的执行类是HeartbeatThread,直接看run方法,润方法中调用了com.netflix.discovery.DiscoveryClient#renew方法

        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }

      可以发现该方法,也是使用http的请求,将appname等信息发送给注册中心。

      (2)判断是否开启服务获取,如果开启,创建一个定时任务,定时刷新服务列表

      这个判断对应上述使用的参数eureka.client.fetch-registry,如果开启则创建一个定时任务,该定时任务的执行频率等都是使用参数配置的(参数内容不再单独说明,看属性名称即可判断配置参数名称),然后主要看调用的实现类CacheRefreshThread,该实现类的run方法调用了com.netflix.discovery.DiscoveryClient#refreshRegistry方法:

        @VisibleForTesting
        void refreshRegistry() {
            try {
                boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
    
                boolean remoteRegionsModified = false;
                // This makes sure that a dynamic change to remote regions to fetch is honored.
                String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                if (null != latestRemoteRegions) {
                    String currentRemoteRegions = remoteRegionsToFetch.get();
                    if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                        // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                        synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                            if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                                String[] remoteRegions = latestRemoteRegions.split(",");
                                remoteRegionsRef.set(remoteRegions);
                                instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                                remoteRegionsModified = true;
                            } else {
                                logger.info("Remote regions to fetch modified concurrently," +
                                        " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                            }
                        }
                    } else {
                        // Just refresh mapping to reflect any DNS/Property change
                        instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                    }
                }
    
                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }
    
                if (logger.isDebugEnabled()) {
                    StringBuilder allAppsHashCodes = new StringBuilder();
                    allAppsHashCodes.append("Local region apps hashcode: ");
                    allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                    allAppsHashCodes.append(", is fetching remote regions? ");
                    allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                    for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                        allAppsHashCodes.append(", Remote region: ");
                        allAppsHashCodes.append(entry.getKey());
                        allAppsHashCodes.append(" , apps hashcode: ");
                        allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                    }
                    logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                            allAppsHashCodes);
                }
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }
        }

      前面的一大堆都是校验,后面的一大堆都是输出,唯一有用的是:boolean success = fetchRegistry(remoteRegionsModified);里面会根据是否是第一发起服务获取请求做不同的请求处理。

      3、注册中心

      Eureka server对于各类rest请求的定义都位于com.netflix.eureka.resources包下,以服务注册方法为例:com.netflix.eureka.resources.ApplicationResource#addInstance

        @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info,
                                    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
            logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
            // validate that the instanceinfo contains all the necessary required fields
            if (isBlank(info.getId())) {
                return Response.status(400).entity("Missing instanceId").build();
            } else if (isBlank(info.getHostName())) {
                return Response.status(400).entity("Missing hostname").build();
            } else if (isBlank(info.getIPAddr())) {
                return Response.status(400).entity("Missing ip address").build();
            } else if (isBlank(info.getAppName())) {
                return Response.status(400).entity("Missing appName").build();
            } else if (!appName.equals(info.getAppName())) {
                return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
            } else if (info.getDataCenterInfo() == null) {
                return Response.status(400).entity("Missing dataCenterInfo").build();
            } else if (info.getDataCenterInfo().getName() == null) {
                return Response.status(400).entity("Missing dataCenterInfo Name").build();
            }
    
            // handle cases where clients may be registering with bad DataCenterInfo with missing data
            DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
            if (dataCenterInfo instanceof UniqueIdentifier) {
                String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
                if (isBlank(dataCenterInfoId)) {
                    boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                    if (experimental) {
                        String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                        return Response.status(400).entity(entity).build();
                    } else if (dataCenterInfo instanceof AmazonInfo) {
                        AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                        String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                        if (effectiveId == null) {
                            amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                        }
                    } else {
                        logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                    }
                }
            }
    
            registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();  // 204 to be backwards compatible
        }

      上面一通校验,对业务逻辑有影响的只有registry.register(info, "true".equals(isReplication));最终调用到org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register(com.netflix.appinfo.InstanceInfo, boolean)方法

        public void register(final InstanceInfo info, final boolean isReplication) {
            handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
            super.register(info, isReplication);
        }

      可以看到,先调用了handleRegistration方法将注册事件通知出去,然后调用了父类的register方法将服务注册。

  • 相关阅读:
    Java异常处理
    冒泡排序法
    21个项目-MNIST机器学习入门
    Hadoop集群搭建中ssh免密登录
    利用奇异值分解简化数据
    数据集中空值替换成对应特征的平均值
    PCA降维处理
    使用FP-growth算法高效发现频繁项集
    原生js---ajax---post方法传数据
    原生js---ajax---get方法传数据
  • 原文地址:https://www.cnblogs.com/liconglong/p/13223182.html
Copyright © 2011-2022 走看看