zoukankan      html  css  js  c++  java
  • 【一起学源码-微服务】Nexflix Eureka 源码三:EurekaServer启动之EurekaServer上下文EurekaClient创建

    前言

    上篇文章已经介绍了 Eureka Server 环境和上下文初始化的一些代码,其中重点讲解了environment初始化使用的单例模式,以及EurekaServerConfigure基于接口对外暴露配置方法的设计方式。这一讲就是讲解Eureka Server上下文初始化剩下的内容:Eureka Client初始化。

    如若转载 请标明来源:一枝花算不算浪漫

    EurekaServer上下文构建之Client

    EurekaClientConfigure创建过程

    因为eurekaSever是集群部署的,所以每个eurekaServer都需要注册到其他注册中心节点。这里自己既是一个eurekaServer,也是一个eurekaClient。

    截取EurekaServer中初始化上下文代码:

    // 3、初始化eureka-server内部的一个eureka-client(用来跟其他的eureka-server节点做注册和通信)
    // 类的开头已经说明了:EurekaInstanceConfig其实就是eureka client相关的配置类
    if (eurekaClient == null) {
        EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
                ? new CloudInstanceConfig()
                : new MyDataCenterInstanceConfig();
        
        applicationInfoManager = new ApplicationInfoManager(
                instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
    
        // DefaultEurekaClientConfig类似于上面的DefaultEurekaServerConfig类实现
        EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
        eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
    } else {
        applicationInfoManager = eurekaClient.getApplicationInfoManager();
    }
    

    再看下eurekaClientConfig创建的代码:

    public DefaultEurekaClientConfig(String namespace) {
        this.namespace = namespace.endsWith(".")
                ? namespace
                : namespace + ".";
    
        this.configInstance = Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME);
        this.transportConfig = new DefaultEurekaTransportConfig(namespace, configInstance);
    }
    
    public static DynamicPropertyFactory initConfig(String configName) {
    
        DynamicPropertyFactory configInstance = DynamicPropertyFactory.getInstance();
    	/**
    	 * 获取eureka client配置文件,类似于 {@link DefaultEurekaServerConfig}中的:
    	 * String eurekaPropsFile = EUREKA_PROPS_FILE.get();
    	 * private static final DynamicStringProperty EUREKA_PROPS_FILE = DynamicPropertyFactory
    	 *             .getInstance().getStringProperty("eureka.server.props","eureka-server");
    	 */
    	DynamicStringProperty EUREKA_PROPS_FILE = configInstance.getStringProperty("eureka.client.props", configName);
    
        String env = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT, "test");
        ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, env);
    
        String eurekaPropsFile = EUREKA_PROPS_FILE.get();
        try {
            ConfigurationManager.loadCascadedPropertiesFromResources(eurekaPropsFile);
        } catch (IOException e) {
            logger.warn(
                    "Cannot find the properties specified : {}. This may be okay if there are other environment "
                            + "specific properties or the configuration is installed with a different mechanism.",
                    eurekaPropsFile);
    
        }
    
        return configInstance;
    }
    

    看到上面代码想到了什么?这完全跟EurekaServerConfig创建的逻辑一样的呀,代码和DefaultEurekaServerConfig一致的逻辑。最后都是交给ConfigurationManager来管理。

    EurekaClient创建过程

    接着再来看eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);代码:

    这段代码确实很长,我们一段段来解读,解读完后再看代码:

    1. 基于ApplicationInfoManager(包含了服务实例的信息、配置,作为服务实例管理的一个组件),eureka client相关的配置,一起构建了一个EurekaClient。

    2. 这里有两个配置:config.shouldFetchRegistry()config.shouldRegisterWithEureka()

      config.shouldFetchRegistry()
      是否需要注册到别的注册中心。eurekaServer有个配置:eureka.client.fetchRegistry,单机情况下为false。false表示自己就是注册中心。我的职责就是维护服务实例,并不需要去检索服务

      config.shouldRegisterWithEureka()
      是否要向别的注册中心注册自己。eurekaServer有个配置:eureka.client.registerWithEureka,单机情况下为false。false表示自己不需要向注册中心注册自己

    3. 创建线程池调度任务

    4. 创建一个心跳线程池

    5. 创建一个缓存刷新线程池

    6. 初始化线程调度任务

    具体代码如下,添加了一些代码备注:

    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
    				Provider<BackupRegistry> backupRegistryProvider) {
    	if (args != null) {
    		this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
    		this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
    		this.eventListeners.addAll(args.getEventListeners());
    		this.preRegistrationHandler = args.preRegistrationHandler;
    	} else {
    		this.healthCheckCallbackProvider = null;
    		this.healthCheckHandlerProvider = null;
    		this.preRegistrationHandler = null;
    	}
    
    	this.applicationInfoManager = applicationInfoManager;
    	InstanceInfo myInfo = applicationInfoManager.getInfo();
    
    	clientConfig = config;
    	staticClientConfig = clientConfig;
    	transportConfig = config.getTransportConfig();
    	instanceInfo = myInfo;
    	if (myInfo != null) {
    		// AppName是服务名称,instanceInfo.getId就是服务实例id,类似于:ServiceA/0001
    		appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    	} else {
    		logger.warn("Setting instanceInfo to a passed in null value");
    	}
    
    	this.backupRegistryProvider = backupRegistryProvider;
    
    	this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    	localRegionApps.set(new Applications());
    
    	fetchRegistryGeneration = new AtomicLong(0);
    
    	remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    	remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
    	// 是否需要注册到别的注册中心。eurekaServer有个配置:eureka.client.fetchRegistry,单机情况下为false。false表示自己就是注册中心。我的职责就是维护服务实例,并不需要去检索服务
    	if (config.shouldFetchRegistry()) {
    		this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    	} else {
    		this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    	}
    	// eurekaServer有个配置:eureka.client.registerWithEureka,单机情况下为false。false表示自己不需要向注册中心注册自己
    	if (config.shouldRegisterWithEureka()) {
    		this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    	} else {
    		this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    	}
    
    	logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
    
    	// 不需要注册也不需要抓取 释放不必要的资源
    	if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
    		logger.info("Client configured to neither register nor query for data.");
    		scheduler = null;
    		heartbeatExecutor = null;
    		cacheRefreshExecutor = null;
    		eurekaTransport = null;
    		instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
    		// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    		// to work with DI'd DiscoveryClient
    		DiscoveryManager.getInstance().setDiscoveryClient(this);
    		DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
    		initTimestampMs = System.currentTimeMillis();
    		logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
    				initTimestampMs, this.getApplications().size());
    
    		return;  // no need to setup up an network tasks and we are done
    	}
    
    	try {
    		// default size of 2 - 1 each for heartbeat and cacheRefresh
    		// 创建一个支持调度的线程池
    		scheduler = Executors.newScheduledThreadPool(2,
    				new ThreadFactoryBuilder()
    						.setNameFormat("DiscoveryClient-%d")
    						.setDaemon(true)
    						.build());
    
    		// 创建一个心跳检查的线程池,最大线程数为5
    		heartbeatExecutor = new ThreadPoolExecutor(
    				1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
    				new SynchronousQueue<Runnable>(),
    				new ThreadFactoryBuilder()
    						.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
    						.setDaemon(true)
    						.build()
    		);  // use direct handoff
    
    		// 支持缓存刷新的线程池,最大线程数为5
    		cacheRefreshExecutor = new ThreadPoolExecutor(
    				1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
    				new SynchronousQueue<Runnable>(),
    				new ThreadFactoryBuilder()
    						.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
    						.setDaemon(true)
    						.build()
    		);  // use direct handoff
    
    		// 支持底层的eureka client跟eureka server进行网络通信的组件
    		eurekaTransport = new EurekaTransport();
    		// 发送http请求,调用restful接口
    		scheduleServerEndpointTask(eurekaTransport, args);
    
    		AzToRegionMapper azToRegionMapper;
    		if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
    			azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
    		} else {
    			azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
    		}
    		if (null != remoteRegionsToFetch.get()) {
    			azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
    		}
    		instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    	} catch (Throwable e) {
    		throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    	}
    
    	// 如果要抓取注册表,但是抓取失败后,需要从备份中读取
    	if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
    		fetchRegistryFromBackup();
    	}
    
    	// call and execute the pre registration handler before all background tasks (inc registration) is started
    	if (this.preRegistrationHandler != null) {
    		this.preRegistrationHandler.beforeRegistration();
    	}
    	// 初始化调度任务
    	initScheduledTasks();
    
    	try {
    		Monitors.registerObject(this);
    	} catch (Throwable e) {
    		logger.warn("Cannot register timers", e);
    	}
    
    	// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    	// to work with DI'd DiscoveryClient
    	DiscoveryManager.getInstance().setDiscoveryClient(this);
    	DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
    	initTimestampMs = System.currentTimeMillis();
    	logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
    			initTimestampMs, this.getApplications().size());
    }
    
    /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
    	// 抓取注册表的定时任务,
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
    		// registryFetchIntervalSeconds默认为30s
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            // 执行cacheRefreshExecutor调度任务,默认是30s
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
    
        // 如果要将自己注册到其他注册中心
        if (clientConfig.shouldRegisterWithEureka()) {
        	//  默认也是30s
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
    
            // Heartbeat timer
    		// 执行heartbeatExecutor心跳检查,默认是30s
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
    
            // InstanceInfo replicator
    		// 创建服务副本传播器
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
    
    		// 创建服务实例状态变更的监听器
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
    
                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
    
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
    
            // 执行线程
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
    

    总结

    如果是eureka server的话,我们在玩儿spring cloud的时候,会将这个fetchRegistry给手动设置为false,如果是eureka server集群的话,就还是要保持为true。registerWithEureka也要设置为true。

    (1)读取EurekaClientConfig,包括TransportConfig
    (2)保存EurekaInstanceConfig和InstanceInfo
    (3)处理了是否要注册以及抓取注册表,如果不要的话,释放一些资源
    (4)支持调度的线程池
    (5)支持心跳的线程池
    (6)支持缓存刷新的线程池
    (7)EurekaTransport,支持底层的eureka client跟eureka server进行网络通信的组件,对网络通信组件进行了一些初始化的操作
    (8)如果要抓取注册表的话,在这里就会去抓取注册表了,但是如果说你配置了不抓取,那么这里就不抓取了
    (9)初始化调度任务:如果要抓取注册表的话,就会注册一个定时任务,按照你设定的那个抓取的间隔,每隔一定时间(默认是30s),去执行一个CacheRefreshThread,给放那个调度线程池里去了;如果要向eureka server进行注册的话,会搞一个定时任务,每隔一定时间发送心跳,执行一个HeartbeatThread;创建了服务实例副本传播器,将自己作为一个定时任务进行调度;创建了服务实例的状态变更的监听器,如果你配置了监听,那么就会注册监听器

    申明

    本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!

    感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

    22.jpg

  • 相关阅读:
    第四周作业
    第三周作业
    第二周作业
    互联网公司采用增量模型做开发的优势
    面向过程分析方法与面向对象分析方法到底区别
    项目测试中的黑盒测试和白盒测试
    立项说明书里面的项目概述。
    项目开发中的一些问题
    面向对象之我见
    [自翻]fasthttp中文文档(持续更新)
  • 原文地址:https://www.cnblogs.com/wang-meng/p/12095165.html
Copyright © 2011-2022 走看看