zoukankan      html  css  js  c++  java
  • Eureka 客户端源码分析(上)

    Eureka作为服务注册中心,主要的功能是服务注册和服务发现,是微服务框架的基础功能和核心功能。

    Eureka的使用可参考:

    Eureka服务端:Spring Cloud Eureka Server使用(注册中心)

    Eureka客户端:Eureka Client的使用

    Eureka服务端:Eureka的高可用

    Eureka分为客户端和服务端,这里主要介绍客户端源码

    1、Eureka客户端主要使用EnableDiscoveryClient注解

    1)@EnableDiscoveryClient源码如下

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Import(EnableDiscoveryClientImportSelector.class) //主要是这个注解
    public @interface EnableDiscoveryClient {
    
    	/**
    	 * If true, the ServiceRegistry will automatically register the local server.
    	 * @return - {@code true} if you want to automatically register.
    	 */
    	boolean autoRegister() default true;
    
    }
    

      

    2)、EnableDiscoveryClientImportSelector

    @Order(Ordered.LOWEST_PRECEDENCE - 100)
    
    public class EnableDiscoveryClientImportSelector
     extends SpringFactoryImportSelector<EnableDiscoveryClient> {
     
        
    @Override
        
    public String[] selectImports(AnnotationMetadata metadata) {
     
            
    	// 1.获取需要注册到Spring的类
            
    	String[] imports = super.selectImports(metadata);    
    	AnnotationAttributes attributes = AnnotationAttributes.fromMap(
     metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
     
            
    	boolean autoRegister = attributes.getBoolean("autoRegister");
     
            
    	// 2.autoRegister默认为true,同时则注册AutoServiceRegistrationConfiguration类到Spring中
            
    	if (autoRegister) {
                
    	List<String> importsList = new ArrayList<>(Arrays.asList(imports));
                
    	importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
                
    	imports = importsList.toArray(new String[0]);
           
     	}
     
            
    	return imports;
       
     }
    	
    ...
    
    }
    

      

    3)、 super.selectImports(metadata);

    SpringFactoryImportSelector的selectImports方法
    public abstract class SpringFactoryImportSelector<T>
    		implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {
    	@Override
    	public String[] selectImports(AnnotationMetadata metadata) {
    		//1、默认isEnabled()为true
    		if (!isEnabled()) {
    			return new String[0];
    		}
    		AnnotationAttributes attributes = AnnotationAttributes.fromMap(
    				metadata.getAnnotationAttributes(this.annotationClass.getName(), true));
    
    		Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
    				+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");
    
    		//主要是这里 Find all possible auto configuration classes, filtering duplicates
    		List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
    				.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
    
    		if (factories.isEmpty() && !hasDefaultFactory()) {
    			throw new IllegalStateException("Annotation @" + getSimpleName()
    					+ " found, but there are no implementations. Did you forget to include a starter?");
    		}
    
    		if (factories.size() > 1) {
    			// there should only ever be one DiscoveryClient, but there might be more than
    			// one factory
    			this.log.warn("More than one implementation " + "of @" + getSimpleName()
    					+ " (now relying on @Conditionals to pick one): " + factories);
    		}
    
    		return factories.toArray(new String[factories.size()]);
    	}
    }
    
    //SpringFactoriesLoader.loadFactoryNames
     public static List<String> loadFactoryNames(Class<?> factoryType, @Nullable ClassLoader classLoader) {
    	//factoryTypeName 值为org.springframework.cloud.client.discovery.EnableDiscoveryClient
            String factoryTypeName = factoryType.getName();
            return (List)loadSpringFactories(classLoader).getOrDefault(factoryTypeName, Collections.emptyList());
        }
    
    //SpringFactoriesLoader.loadSpringFactories
        private static Map<String, List<String>> loadSpringFactories(@Nullable ClassLoader classLoader) {
            MultiValueMap<String, String> result = (MultiValueMap)cache.get(classLoader);
            if (result != null) {
                return result;
            } else {
                try {
    		//1、获取所有META-INF/spring.factories文件
                    Enumeration<URL> urls = classLoader != null ? classLoader.getResources("META-INF/spring.factories") : ClassLoader.getSystemResources("META-INF/spring.factories");
                    LinkedMultiValueMap result = new LinkedMultiValueMap();
    		//遍历所有spring.factories 文件
                    while(urls.hasMoreElements()) {
                        URL url = (URL)urls.nextElement();
                        UrlResource resource = new UrlResource(url);
                        Properties properties = PropertiesLoaderUtils.loadProperties(resource);
                        Iterator var6 = properties.entrySet().iterator();
    
                        while(var6.hasNext()) {
                            Entry<?, ?> entry = (Entry)var6.next();
                            String factoryTypeName = ((String)entry.getKey()).trim();
                            String[] var9 = StringUtils.commaDelimitedListToStringArray((String)entry.getValue());
                            int var10 = var9.length;
    
                            for(int var11 = 0; var11 < var10; ++var11) {
                                String factoryImplementationName = var9[var11];
                                //3、获取properties中key为EnableDiscoveryClient对应的value值列表。
    			    result.add(factoryTypeName, factoryImplementationName.trim());
                            }
                        }
                    }
    
                    cache.put(classLoader, result);
                    return result;
                } catch (IOException var13) {
                    throw new IllegalArgumentException("Unable to load factories from location [META-INF/spring.factories]", var13);
                }
            }
        }
    

      获取properties中key为EnableDiscoveryClient对应的value值列表。对应的值可以在spring-cloud-netflix-eureka-client-x.x.x.RELEASE.jar中META-INF的spring.factories。

    值为:org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

    spring.factories的详情如下:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=
    org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,
    org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,
    org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,
    org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,
    org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,
    org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,
    org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration
    
    org.springframework.cloud.bootstrap.BootstrapConfiguration=
    org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration  

    其实,spring.factories中EnableAutoConfiguration对应的value值列表的类会在SpringBoot项目启动的时候注册到Spring容器中,EurekaClient的关键功能就在EurekaClientAutoConfiguration

    2、EurekaClientConfigServerAutoConfiguration功能解析

    @Configuration( proxyBeanMethods = false)
    @EnableConfigurationProperties
    @ConditionalOnClass({EurekaInstanceConfigBean.class, EurekaClient.class, ConfigServerProperties.class})
    public class EurekaClientConfigServerAutoConfiguration {
        @Autowired( required = false)
        private EurekaInstanceConfig instance;
        @Autowired( required = false)
        private ConfigServerProperties server;
    
        public EurekaClientConfigServerAutoConfiguration() {
        }
    
        @PostConstruct
        public void init() {
            if (this.instance != null && this.server != null) {
                String prefix = this.server.getPrefix();
                if (StringUtils.hasText(prefix) && !StringUtils.hasText((String)this.instance.getMetadataMap().get("configPath"))) {
                    this.instance.getMetadataMap().put("configPath", prefix);
                }
    
            }
        }
    }
    

     通过注解@ConditionalOnClass可知, EurekaClientConfigServerAutoConfiguration 类的产生需要EurekaInstanceConfigBean.class, EurekaClient.class, ConfigServerProperties.class 这三个类先产生。

    我们重点关注EurekaClient ,源码如下

    @ImplementedBy(DiscoveryClient.class)
    public interface EurekaClient extends LookupService {  

     就是一个接口,并定义了默认实现类DiscoveryClient,该接口定义了Eureka客户端主要功能,包括获取服务URL、注册当前服务等功能。

    3、DiscoveryClient(com.netflix.discovery.DiscoveryClient)

    Eureka主要功能有: 服务注册、服务续约、服务下线、服务调用

    1) 服务注册(发送注册请求到注册中心)

        boolean register() throws Throwable {
            EurekaHttpResponse httpResponse;
            try {
    	    //主要的注册功能在这里,真正的实现在AbstractJerseyEurekaHttpClient.register()
                httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
            } catch (Exception var3) {
                logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3});
                throw var3;
            }
      	...
            return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
        }
    
    
    //AbstractJerseyEurekaHttpClient.register()
        @Override
        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName();
            ClientResponse response = null;
            try {
    	    //1、构造一个HTTP请求
                Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
                addExtraHeaders(resourceBuilder);
    	    //2、发送post请求到serviceUrl,serviceUrl即我们在配置文件中配置的defaultZone
                response = resourceBuilder
                        .header("Accept-Encoding", "gzip")
                        .type(MediaType.APPLICATION_JSON_TYPE)
                        .accept(MediaType.APPLICATION_JSON)
                        .post(ClientResponse.class, info);
               //3、返回响应状态 
    	   return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }
    

      

     2)服务续约

    本质就是发送当前应用的心跳请求

      boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
    	    //1、本质就是发送心跳请求
    	    //2、真正实现为AbstractJerseyEurekaHttpClient.sendHeartBeat() 
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
    	    //3、如果请求失败,则调用注册服务请求
                if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }
    
    
    //AbstractJerseyEurekaHttpClient.sendHeartBeat() 
    @Override
        public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
            String urlPath = "apps/" + appName + '/' + id;
            ClientResponse response = null;
            try {
    	    //将当前实例的元信息(InstanceInfo)以及状态(UP)通过HTTP请求发送到serviceUrl
                WebResource webResource = jerseyClient.resource(serviceUrl)
                        .path(urlPath)
                        .queryParam("status", info.getStatus().toString())
                        .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
                if (overriddenStatus != null) {
                    webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
                }
                Builder requestBuilder = webResource.getRequestBuilder();
                addExtraHeaders(requestBuilder);
                response = requestBuilder.put(ClientResponse.class);
                EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
                if (response.hasEntity() &&
                        !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
                    eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
                }
                return eurekaResponseBuilder.build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }
    

      

    3)服务调用

    本质就是获取调用服务名所对应的服务提供者实例信息,包括IP、port等

       @Override
        public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure) {
            return getInstancesByVipAddress(vipAddress, secure, instanceRegionChecker.getLocalRegion());
        }
    
        @Override
        public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure,
                                                           @Nullable String region) {
            if (vipAddress == null) {
                throw new IllegalArgumentException(
                        "Supplied VIP Address cannot be null");
            }
            Applications applications;
    	//1、判断服务提供方是否是当前region,若是的话直接从localRegionApp中获取
            if (instanceRegionChecker.isLocalRegion(region)) {
                applications = this.localRegionApps.get();
            } 
    	//2、否则的话从远程region获取
    	else {
                applications = remoteRegionVsApps.get(region);
                if (null == applications) {
                    logger.debug("No applications are defined for region {}, so returning an empty instance list for vip "
                            + "address {}.", region, vipAddress);
                    return Collections.emptyList();
                }
            }
            //从applications中获取服务名称对应的实例名称列表
            if (!secure) {
                return applications.getInstancesByVirtualHostName(vipAddress);
            } else {
                return applications.getInstancesBySecureVirtualHostName(vipAddress);
    
            }
    
        }
    

      

    4)服务下线

        void unregister() {
            // It can be null if shouldRegisterWithEureka == false
            if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
                try {
                    logger.info("Unregistering ...");
                    EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                    logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
                } catch (Exception e) {
                    logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
                }
            }
        }
    
    
      //AbstractJerseyEurekaHttpClient.cancel()
        @Override
        public EurekaHttpResponse<Void> cancel(String appName, String id) {
            String urlPath = "apps/" + appName + '/' + id;
            ClientResponse response = null;
            try {
                Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
                addExtraHeaders(resourceBuilder);
    	    //本质就是发送delete请求到注册中心
                response = resourceBuilder.delete(ClientResponse.class);
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }
    

      

     通过上面的分析可知,服务注册、续约、下线、调用等都是通过HTTP请求来实现的。那这些是什么时候被调用呢?

    可参考:Eureka 客户端源码分析(下)

    Eureka 客户端源码分析(下)

  • 相关阅读:
    pysocketserver
    协程
    py模块和包
    py网络编程
    GridView绑定技巧终结者
    iOS 证书/私钥/代码签名/描述文件
    八拜之交
    POJ3230(Travel)
    POJ2553(The Bottom of a Graph)
    动态规划的实质
  • 原文地址:https://www.cnblogs.com/linlf03/p/12596638.html
Copyright © 2011-2022 走看看