zoukankan      html  css  js  c++  java
  • Eureka 服务端端源码分析

    Eureka服务端使用的使用,会引入spring-cloud-starter-rereka-server, 在Application类中引入注解@EnableEurekaServer。

    1、@EnableEurekaServer。

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import({EurekaServerMarkerConfiguration.class})
    public @interface EnableEurekaServer {
    }
    
    
    @Configuration( proxyBeanMethods = false)
    public class EurekaServerMarkerConfiguration {
        public EurekaServerMarkerConfiguration() {
        }
    
        @Bean
        public EurekaServerMarkerConfiguration.Marker eurekaServerMarkerBean() {
            return new EurekaServerMarkerConfiguration.Marker();
        }
    
        class Marker {
            Marker() {
            }
        }
    }
    

     看来找不到什么 。

    然后进入spring-cloud-netflix-eureka-server-2.2.2.RELEASE的spring.factories查找

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=
    org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

    EnableAutoConfiguration对应的值列表中的类会在SpringBoot项目启动的时候注册到Spring容器中,那么EurekaServerAutoConfiguration会被默认加载到Spring中

    2、EurekaServerAutoConfiguration

    @ConditionalOnBean({Marker.class})
    @EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})
    @PropertySource({"classpath:/eureka/server.properties"})
    public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
        
    
     
    
        @Configuration( proxyBeanMethods = false )
        protected static class EurekaServerConfigBeanConfiguration {
    
            @Bean
            @ConditionalOnMissingBean
    	//1、创建并加载EurekaServerConfig的实现类。主要是Eureka-server的配置信息
            public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
                EurekaServerConfigBean server = new EurekaServerConfigBean();
                if (clientConfig.shouldRegisterWithEureka()) {
                    server.setRegistrySyncRetries(5);
                }
    
                return server;
            }
        }
    
        @Bean
        @ConditionalOnProperty(prefix = "eureka.dashboard",name = {"enabled"}, matchIfMissing = true)
        // 2.Eureka-server的可视化界面就是通过EurekaController提供的
        public EurekaController eurekaController() {
            return new EurekaController(this.applicationInfoManager);
        }
    
    	
        @Bean
        // 3.接收客户端的注册等请求就是通过InstanceRegistry来处理的
        public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
            this.eurekaClient.getApplications();
            return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
        }
    
        // 4.初始化Eureka-server,会同步其他注册中心的数据到当前注册中心
        @Bean
        public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
            return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext);
        }
    
    }
    

      总结:通过以上分析可知,EurekaServer在启动的时候,会加载很多bean到Spring容器中,每个bean都实现了各自的功能,我们分析最重要的一个bean,也就是真正处理客户端请求的类InstanceRegistry.java,

    3、org.springframework.cloud.netflix.eureka.server.InstanceRegistry 

    // 1.接收客户端注册请求
    public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
            this.handleRegistration(info, leaseDuration, isReplication);
            super.register(info, leaseDuration, isReplication); //// 在1)中详细分
        }
    
     // 2.接收客户端下线请求
        public boolean cancel(String appName, String serverId, boolean isReplication) {
            this.handleCancelation(appName, serverId, isReplication);
            return super.cancel(appName, serverId, isReplication); // 在2)中详细分析
      // 3.接收客户端续约请求  
      public boolean renew(final String appName, final String serverId, boolean isReplication) {
            this.log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
            List<Application> applications = this.getSortedApplications();
            Iterator var5 = applications.iterator();
    
            while(var5.hasNext()) {
                Application input = (Application)var5.next();
                if (input.getName().equals(appName)) {
                    InstanceInfo instance = null;
                    Iterator var8 = input.getInstances().iterator();
    
                    while(var8.hasNext()) {
                        InstanceInfo info = (InstanceInfo)var8.next();
                        if (info.getId().equals(serverId)) {
                            instance = info;
                            break;
                        }
                    }
    
                    this.publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
                    break;
                }
            }
    
            return super.renew(appName, serverId, isReplication); // 在3)中详细分析
    
        }
    

      Eureka Client发送的是HTTP客户端,那Eureka Server应该有一个类接收客户端请求,并将具体业务处理委托给InstanceRegistry,这个类就是com.netflix.eureka.resources包下的ApplicationResource、InstanceResource类

      注意:这种接收请求的方式是采用jax-rs的方式,

    1)AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication)注册

        public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            try {
                this.read.lock();
    	    //  // 1.所有的服务信息都添加到registry这个map中,
                // registry 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
                Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
                EurekaMonitors.REGISTER.increment(isReplication);
        	    // // 1.如果没有该服务的信息,则新建,并添加到registry中
                if (gMap == null) {
                    ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
                    gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
                    if (gMap == null) {
                        gMap = gNewMap;
                    }
                }
    	    // 2.existingLease信息即服务的一些注册时间等信息,主要是为了校验该服务是否过期,如果已过期,则剔除
    
                Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
                if (existingLease != null && existingLease.getHolder() != null) {
                    Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp();
                    Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                    logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                        logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                        logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                        registrant = (InstanceInfo)existingLease.getHolder();
                    }
                } else {
                    synchronized(this.lock) {
                        if (this.expectedNumberOfClientsSendingRenews > 0) {
                            ++this.expectedNumberOfClientsSendingRenews;
                            this.updateRenewsPerMinThreshold();
                        }
                    }
    
                    logger.debug("No previous lease information found; it is new registration");
                }
    
                Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);
                if (existingLease != null) {
                    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                }
    
                ((Map)gMap).put(registrant.getId(), lease);
                ...
            }
    
        }
    

      

    总结1):通过上述分析可知,服务注册信息最终存放到

    // 外层map的key即为应用的服务名,内层map的key为我们设置的eureka.instance.instance-id
    // 设置成这种格式,当多个应用提供相同服务时,那么外层map的key都相同,内层map的key不同

    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
     
    所有的操作都是针对这个map进行操作

    2)   AbstractInstanceRegistry.renew(String appName, String id, boolean isReplication)续约

        public boolean renew(String appName, String id, boolean isReplication) {
            EurekaMonitors.RENEW.increment(isReplication);
    	// // 1.获取对应map
            Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
            Lease<InstanceInfo> leaseToRenew = null;
            if (gMap != null) {
    	    // 2.主要是为了获取当前服务的一些过期信息
                leaseToRenew = (Lease)gMap.get(id);
            }
    
            if (leaseToRenew == null) {
               ...
            } else {
               ...
    
                this.renewsLastMin.increment();
    	    // // 主要操作在这里,将最新更新时间重置,剔除任务检查的也就是这个最新更新时间
                // lastUpdateTimestamp = System.currentTimeMillis() + duration;
    
                leaseToRenew.renew();
                return true;
            }
        }
    

      

    3)AbstractInstanceRegistry.cancel(String appName, String id, boolean isReplication)下线

    public boolean cancel(String appName, String id, boolean isReplication) {
            return this.internalCancel(appName, id, isReplication);
        }
    
        protected boolean internalCancel(String appName, String id, boolean isReplication) {
            try {
                this.read.lock();
                EurekaMonitors.CANCEL.increment(isReplication);
    	     // 1.获取gmap
                Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
                Lease<InstanceInfo> leaseToCancel = null;
                if (gMap != null) {
    	   	  // 2.删除gmap中该服务i
                    leaseToCancel = (Lease)gMap.remove(id);
                }
    
    	    // 3.将当前服务的剔除时间置为当前时间
                //evictionTimestamp = System.currentTimeMillis();
                leaseToCancel.cancel();
    	    //// 4.获取服务信息
                InstanceInfo instanceInfo = (InstanceInfo)leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
    		 // 5.将服务信息置为已删除
                    instanceInfo.setActionType(ActionType.DELETED);
                    this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
    
                
            } finally {
                this.read.unlock();
            }
    
           
        } 

    总结: 

    * 服务的注册实际上是将服务信息添加到一个map中,map的key是服务名称,value也是一个map,是提供该服务的所有客户端信息;

    * 服务的续约实际上是获取map中该服务的客户端信息,然后修改其最新更新时间

    * 服务的下线实际上是删除该map中该服务信息,然后修改服务状态

    参考: Eureka源码深度解析(下)

  • 相关阅读:
    Zuul的核心源码解析
    基于Sentinel的服务保护
    Sentinel
    windows进行配置转发
    Hystrix断路器
    服务熔断Hystrix高级
    微服务架构的高并发问题
    Feign的高级配置
    倒排序原理和实例
    云计算技术的产生、概念、原理、应用和前景
  • 原文地址:https://www.cnblogs.com/linlf03/p/12601621.html
Copyright © 2011-2022 走看看