zoukankan      html  css  js  c++  java
  • Eureka源码分析

    源码流程图

    先上图,不太清晰,抱歉

    一、Eureka Server源码分析

    1. 从@EnableEurekaServer注解为入口,它是一个标记注解,点进去看
    2. 注解内容如下
    /**
     * 激活Eureka服务器相关配置的注释
     * Annotation to activate Eureka Server related configuration {@link EurekaServerAutoConfiguration}
     *
     * @author Dave Syer
     * @author Biju Kunjummen
     *
     */
    
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(EurekaServerMarkerConfiguration.class)
    public @interface EnableEurekaServer {
    
    }
    
    1. 从注解可以看到它@link 了配置启动类EurekaServerAutoConfiguration,这个类会将配置信息读取进来
    @Configuration
    @Import(EurekaServerInitializerConfiguration.class)
    @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
    @EnableConfigurationProperties({ EurekaDashboardProperties.class,
    		InstanceRegistryProperties.class })
    @PropertySource("classpath:/eureka/server.properties")
    public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    
        }
    
    1. EurekaServerAutoConfiguration上有一个注解@Import(EurekaServerInitializerConfiguration.class),@Import的作用会将这个类的实例加入IOC容器。

    2. 所以EurekaServerInitializerConfiguration这个类会被初始化,它实现了SmartLifecycle接口,当ApplicationContext自身启动时会调用他的start()方法。

    @Configuration
    public class EurekaServerInitializerConfiguration
    		implements ServletContextAware, SmartLifecycle, Ordered {
    
    
        //省略部分代码
    
    	@Override
    	public void start() {
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					//TODO: is this class even needed now?
    					//初始化EurekaServer,同时启动Eureka Server
    					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
    					log.info("Started Eureka Server");
                        //事件发布
    					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
    					EurekaServerInitializerConfiguration.this.running = true;
    					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
    				}
    				catch (Exception ex) {
    					// Help!
    					log.error("Could not initialize Eureka servlet context", ex);
    				}
    			}
    		}).start();
    	}
    
    
    }
    
    1. 从eurekaServerBootstrap.contextInitialized()方法进去
    public void contextInitialized(ServletContext context) {
    		try {
    		    //初始化环境变量
    			initEurekaEnvironment();
    			//初始化上下文
    			initEurekaServerContext();
    
    			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    		}
    		catch (Throwable e) {
    			log.error("Cannot bootstrap eureka server :", e);
    			throw new RuntimeException("Cannot bootstrap eureka server :", e);
    		}
    	}
    
    1. initEurekaEnvironment里面都是一些环境变量初始化,不用太关心。主要看initEurekaServerContext()这个初始化上下文的方法
    protected void initEurekaServerContext() throws Exception {
    		// For backward compatibility
    		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
    				XStream.PRIORITY_VERY_HIGH);
    		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
    				XStream.PRIORITY_VERY_HIGH);
    
    		if (isAws(this.applicationInfoManager.getInfo())) {
    			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
    					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
    			this.awsBinder.start();
    		}
    
    		EurekaServerContextHolder.initialize(this.serverContext);
    
    		log.info("Initialized server context");
    
    		// Copy registry from neighboring eureka node
    		int registryCount = this.registry.syncUp();
    		//这句重要,里面会打开定时器
    		this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
    		// Register all monitoring statistics.
    		//注册所有监视统计信息
    		EurekaMonitors.registerAllStats();
    	}
    
    1. 进入openForTraffic方法内部
     @Override
        public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
            // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
            this.expectedNumberOfRenewsPerMin = count * 2;
            this.numberOfRenewsPerMinThreshold =
                    (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
            logger.info("Got {} instances from neighboring DS node", count);
            logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
            this.startupTime = System.currentTimeMillis();
            if (count > 0) {
                this.peerInstancesTransferEmptyOnStartup = false;
            }
            DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
            boolean isAws = Name.Amazon == selfName;
            if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
                logger.info("Priming AWS connections for all replicas..");
                primeAwsReplicas(applicationInfoManager);
            }
            logger.info("Changing status to UP");
            applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
            //启动定时器
            super.postInit();
        }
    
    1. 进入super.postInit();方法,开启了定时器,清理没有按时续约的client
     protected void postInit() {
            renewsLastMin.start();
            if (evictionTaskRef.get() != null) {
                evictionTaskRef.get().cancel();
            }
            evictionTaskRef.set(new EvictionTask());
            evictionTimer.schedule(evictionTaskRef.get(),
                    serverConfig.getEvictionIntervalTimerInMs(),
                    serverConfig.getEvictionIntervalTimerInMs());
        }
    
    1. 剔除服务的方法可以看看,主要思想就是找到要剔除服务的服务id,然后从map移除
     public void evict(long additionalLeaseMs) {
            logger.debug("Running the evict task");
    
            if (!isLeaseExpirationEnabled()) {
                logger.debug("DS: lease expiration is currently disabled.");
                return;
            }
    
            // We collect first all expired items, to evict them in random order. For large eviction sets,
            // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
            // the impact should be evenly distributed across all applications.
            List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
            for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
                Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
                if (leaseMap != null) {
                    for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                        Lease<InstanceInfo> lease = leaseEntry.getValue();
                        if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                            expiredLeases.add(lease);
                        }
                    }
                }
            }
    
            // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
            // triggering self-preservation. Without that we would wipe out full registry.
            int registrySize = (int) getLocalRegistrySize();
            int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
            int evictionLimit = registrySize - registrySizeThreshold;
    
            int toEvict = Math.min(expiredLeases.size(), evictionLimit);
            if (toEvict > 0) {
                logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
    
                Random random = new Random(System.currentTimeMillis());
                for (int i = 0; i < toEvict; i++) {
                    // Pick a random item (Knuth shuffle algorithm)
                    int next = i + random.nextInt(expiredLeases.size() - i);
                    Collections.swap(expiredLeases, i, next);
                    Lease<InstanceInfo> lease = expiredLeases.get(i);
    
                    String appName = lease.getHolder().getAppName();
                    String id = lease.getHolder().getId();
                    EXPIRED.increment();
                    logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                    //根据id移除
                    internalCancel(appName, id, false);
                }
            }
        }
    

    二、Eureka客户端源码

    通过看源码想要解决的问题 :

    问题: 如何实现自动注册的?

    1. 通过这个类EurekaClientAutoConfiguration入手
    2. 该类里有个eurekaClient方法
    @Bean(destroyMethod = "shutdown")
    		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
    		@org.springframework.cloud.context.config.annotation.RefreshScope
    		@Lazy
    		public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
    			manager.getInfo(); // force initialization
    			return new CloudEurekaClient(manager, config, this.optionalArgs,
    					this.context);
    		}
    
    1. 可以看出他new 了一个CloudEurekaClient对象,而他的父类是DiscoveryClient,所以DiscoveryClient的构造方法会被执行。
    2. 构造方法里的重要方法部分
    try {
                // default size of 2 - 1 each for heartbeat and cacheRefresh
                scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
    
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, args);
    
                AzToRegionMapper azToRegionMapper;
                if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                    azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
                } else {
                    azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
                }
                if (null != remoteRegionsToFetch.get()) {
                    azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
                }
                instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
            }
    
            if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
                fetchRegistryFromBackup();
            }
    
            // call and execute the pre registration handler before all background tasks (inc registration) is started
            if (this.preRegistrationHandler != null) {
                this.preRegistrationHandler.beforeRegistration();
            }
    
            if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
                try {
                    if (!register() ) {
                        throw new IllegalStateException("Registration error at startup. Invalid server response.");
                    }
                } catch (Throwable th) {
                    logger.error("Registration error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
            // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
            //初始化定时器任务
            initScheduledTasks();
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register timers", e);
            }
    
    1. 在initScheduledTasks方法里有一个状态改变监听器StatusChangeListener,他会触发instanceInfoReplicator.onDemandUpdate();方法,该方法会调用run方法
    2. 在InstanceInfoReplicator的run方法里调用了 discoveryClient.register()方法
    3. 就是该方法完成了服务注册,看下面代码可以看出通过http方式请求服务端完成了注册
     boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }
    
    1. 可以看到客户端有日志{registering service...}打印,说明确实执行到了这个方法。
    2. 并且会发现有两个定时任务一直在执行,一个HeartbeatThread,一个CacheRefreshThread

    多级缓存思想

    1. 在拉取注册表的时候
    • 首先从ReadOnlyCacheMap中查询缓存的注册表
    • 如果没有,就找ReadWriteCacheMap里缓存的注册表
    • 如果还没有,就从内存中获取实际的注册表数据
    1. 注册表发生变更的时候
    • 会在内存中更新变更的注册表数据,同时过期掉ReadWriteCacheMap。
    • 此过程中不会影响ReadOnlyCacheMap提供使用。
    • 默认30秒ReadWriteCacheMap的数据更新到ReadOnlyCacheMap
    • 默认180秒秒ReadWriteCacheMap的数据失效
    • 下次有服务拉取注册表,又会从内存中获取最新数据了,同时填充各级缓存
  • 相关阅读:
    .NET 统一用户管理 -- 统一鉴权
    .NET 统一用户管理 -- 单点登录
    基于.net 职责链来实现 插件模式
    电商开放平台设计
    docker搭建一个渗透测试环境 bwapp为例
    dcoker运行msf
    关于构造靶场
    判断网站是不是真实ip
    H3C设备配置ARP攻击防御
    Java代码审计 HTTP头操纵 response.addHeader()
  • 原文地址:https://www.cnblogs.com/javammc/p/12748761.html
Copyright © 2011-2022 走看看