zoukankan      html  css  js  c++  java
  • SpringCloud学习笔记(3)----Spring Cloud Netflix之深入理解Eureka

      

    1. Eureka服务端的启动过程

      1.1  入口类EurekaServerInitializerConfiguration类,

     
    public void start() {
            (new Thread(new Runnable() {
                public void run() {
                    try {
                        EurekaServerInitializerConfiguration.this.eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                        EurekaServerInitializerConfiguration.log.info("Started Eureka Server");
                        EurekaServerInitializerConfiguration.this.publish(new EurekaRegistryAvailableEvent(EurekaServerInitializerConfiguration.this.getEurekaServerConfig()));
                        EurekaServerInitializerConfiguration.this.running = true;
                        EurekaServerInitializerConfiguration.this.publish(new EurekaServerStartedEvent(EurekaServerInitializerConfiguration.this.getEurekaServerConfig()));
                    } catch (Exception var2) {
                        EurekaServerInitializerConfiguration.log.error("Could not initialize Eureka servlet context", var2);
                    }
    
                }
            })).start();
        }
     

     通过start()方法来加载上下文和相关的Server端的配置信息,该类上面使用了@Configuration注解,会被spring bean容器感知到。

      2. Eureka的初始化

      EurekaServerBootstrap类的contextInitialized方法

      

     
     public void contextInitialized(ServletContext context) {
            try {
                this.initEurekaEnvironment(); //环境初始化
                this.initEurekaServerContext(); //服务初始化
                context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
            } catch (Throwable var3) {
                log.error("Cannot bootstrap eureka server :", var3);
                throw new RuntimeException("Cannot bootstrap eureka server :", var3);
            }
        }
     

      查找服务初始化中的Evicetion

      

     
    protected void initEurekaServerContext() throws Exception {
            JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
            XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
            if (this.isAws(this.applicationInfoManager.getInfo())) {
                this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);
                this.awsBinder.start();
            }
    
            EurekaServerContextHolder.initialize(this.serverContext);
            log.info("Initialized server context");
            int registryCount = this.registry.syncUp();
            this.registry.openForTraffic(this.applicationInfoManager, registryCount);
            EurekaMonitors.registerAllStats();
        }
     

      openForTraffic()

     
     public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
            this.expectedNumberOfRenewsPerMin = count * 2;
            this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfRenewsPerMin * this.serverConfig.getRenewalPercentThreshold());
            logger.info("Got {} instances from neighboring DS node", count);
            logger.info("Renew threshold is: {}", this.numberOfRenewsPerMinThreshold);
            this.startupTime = System.currentTimeMillis();
            if (count > 0) {
                this.peerInstancesTransferEmptyOnStartup = false;
            }
    
            Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
            boolean isAws = Name.Amazon == selfName;
            if (isAws && this.serverConfig.shouldPrimeAwsReplicaConnections()) {
                logger.info("Priming AWS connections for all replicas..");
                this.primeAwsReplicas(applicationInfoManager);
            }
    
            logger.info("Changing status to UP");
            applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
            super.postInit();
        }
     

      

     
     protected void postInit() {
            this.renewsLastMin.start();
            if (this.evictionTaskRef.get() != null) {
                ((AbstractInstanceRegistry.EvictionTask)this.evictionTaskRef.get()).cancel();
            }
    
            this.evictionTaskRef.set(new AbstractInstanceRegistry.EvictionTask());
            this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(), this.serverConfig.getEvictionIntervalTimerInMs(), this.serverConfig.getEvictionIntervalTimerInMs());
        }
     

      

    2. 客户端注册过程

      客户端流程

      

      1.入口:DiscoveryClient功能描述

         1)  向Eureka Server注册服务实例

            2)向Eureka Server续约

            3)  当服务关闭的时候向Eureka Server取消租约

            4)  查询注册到Eureka Server中的服务实例

      2 实例化,调用构造方法

      

     
    @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {
            this.RECONCILE_HASH_CODES_MISMATCH = Monitors.newCounter("DiscoveryClient_ReconcileHashCodeMismatch");
            this.FETCH_REGISTRY_TIMER = Monitors.newTimer("DiscoveryClient_FetchRegistry");
            this.REREGISTER_COUNTER = Monitors.newCounter("DiscoveryClient_Reregister");
            this.localRegionApps = new AtomicReference();
            this.fetchRegistryUpdateLock = new ReentrantLock();
            this.healthCheckHandlerRef = new AtomicReference();
            this.remoteRegionVsApps = new ConcurrentHashMap();
            this.lastRemoteInstanceStatus = InstanceStatus.UNKNOWN;
            this.eventListeners = new CopyOnWriteArraySet();
            this.registrySize = 0;
            this.lastSuccessfulRegistryFetchTimestamp = -1L;
            this.lastSuccessfulHeartbeatTimestamp = -1L;
            this.isShutdown = new AtomicBoolean(false);
            if (args != null) {
                this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
                this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
                this.eventListeners.addAll(args.getEventListeners());
                this.preRegistrationHandler = args.preRegistrationHandler;
            } else {
                this.healthCheckCallbackProvider = null;
                this.healthCheckHandlerProvider = null;
                this.preRegistrationHandler = null;
            }
    
            this.applicationInfoManager = applicationInfoManager;
            InstanceInfo myInfo = applicationInfoManager.getInfo();
            this.clientConfig = config;
            staticClientConfig = this.clientConfig;
            this.transportConfig = config.getTransportConfig();
            this.instanceInfo = myInfo;
            if (myInfo != null) {
                this.appPathIdentifier = this.instanceInfo.getAppName() + "/" + this.instanceInfo.getId();
            } else {
                logger.warn("Setting instanceInfo to a passed in null value");
            }
    
            this.backupRegistryProvider = backupRegistryProvider;
            this.urlRandomizer = new InstanceInfoBasedUrlRandomizer(this.instanceInfo);
            this.localRegionApps.set(new Applications());
            this.fetchRegistryGeneration = new AtomicLong(0L);
            this.remoteRegionsToFetch = new AtomicReference(this.clientConfig.fetchRegistryForRemoteRegions());
            this.remoteRegionsRef = new AtomicReference(this.remoteRegionsToFetch.get() == null ? null : ((String)this.remoteRegionsToFetch.get()).split(","));
            if (config.shouldFetchRegistry()) {
                this.registryStalenessMonitor = new ThresholdLevelsMetric(this, "eurekaClient.registry.lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            if (config.shouldRegisterWithEureka()) {
                this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, "eurekaClient.registration.lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            logger.info("Initializing Eureka in region {}", this.clientConfig.getRegion());
            if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
                logger.info("Client configured to neither register nor query for data.");
                this.scheduler = null;
                this.heartbeatExecutor = null;
                this.cacheRefreshExecutor = null;
                this.eurekaTransport = null;
                this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), this.clientConfig.getRegion());
                DiscoveryManager.getInstance().setDiscoveryClient(this);
                DiscoveryManager.getInstance().setEurekaClientConfig(config);
                this.initTimestampMs = System.currentTimeMillis();
                logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size());
            } else {
                try {
                    this.scheduler = Executors.newScheduledThreadPool(2, (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-%d").setDaemon(true).build());
                    this.heartbeatExecutor = new ThreadPoolExecutor(1, this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());
                    this.cacheRefreshExecutor = new ThreadPoolExecutor(1, this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());
                    this.eurekaTransport = new DiscoveryClient.EurekaTransport();
                    this.scheduleServerEndpointTask(this.eurekaTransport, args);
                    Object azToRegionMapper;
                    if (this.clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                        azToRegionMapper = new DNSBasedAzToRegionMapper(this.clientConfig);
                    } else {
                        azToRegionMapper = new PropertyBasedAzToRegionMapper(this.clientConfig);
                    }
    
                    if (null != this.remoteRegionsToFetch.get()) {
                        ((AzToRegionMapper)azToRegionMapper).setRegionsToFetch(((String)this.remoteRegionsToFetch.get()).split(","));
                    }
    
                    this.instanceRegionChecker = new InstanceRegionChecker((AzToRegionMapper)azToRegionMapper, this.clientConfig.getRegion());
                } catch (Throwable var9) {
                    throw new RuntimeException("Failed to initialize DiscoveryClient!", var9);
                }
    
                if (this.clientConfig.shouldFetchRegistry() && !this.fetchRegistry(false)) {
                    this.fetchRegistryFromBackup();
                }
    
                if (this.preRegistrationHandler != null) {
                    this.preRegistrationHandler.beforeRegistration();
                }
    
                if (this.clientConfig.shouldRegisterWithEureka() && this.clientConfig.shouldEnforceRegistrationAtInit()) {
                    try {
                        if (!this.register()) {
                            throw new IllegalStateException("Registration error at startup. Invalid server response.");
                        }
                    } catch (Throwable var8) {
                        logger.error("Registration error at startup: {}", var8.getMessage());
                        throw new IllegalStateException(var8);
                    }
                }
    
                this.initScheduledTasks(); //注册任务主要是在这个定时任务里面实现
    
                try {
                    Monitors.registerObject(this);
                } catch (Throwable var7) {
                    logger.warn("Cannot register timers", var7);
                }
    
                DiscoveryManager.getInstance().setDiscoveryClient(this);
                DiscoveryManager.getInstance().setEurekaClientConfig(config);
                this.initTimestampMs = System.currentTimeMillis();
                logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size());
            }
        }
     

      

     
     public boolean onDemandUpdate() {
            if (this.rateLimiter.acquire(this.burstSize, (long)this.allowedRatePerMinute)) {
                if (!this.scheduler.isShutdown()) {
                    this.scheduler.submit(new Runnable() {
                        public void run() {
                            InstanceInfoReplicator.logger.debug("Executing on-demand update of local InstanceInfo");
                            Future latestPeriodic = (Future)InstanceInfoReplicator.this.scheduledPeriodicRef.get();
                            if (latestPeriodic != null && !latestPeriodic.isDone()) {
                                InstanceInfoReplicator.logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                                latestPeriodic.cancel(false);
                            }
    
                            InstanceInfoReplicator.this.run();
                        }
                    });
                    return true;
                } else {
                    logger.warn("Ignoring onDemand update due to stopped scheduler");
                    return false;
                }
            } else {
                logger.warn("Ignoring onDemand update due to rate limiter");
                return false;
            }
        }
    
        public void run() {
            boolean var6 = false;
    
            ScheduledFuture next;
            label53: {
                try {
                    var6 = true;
                    this.discoveryClient.refreshInstanceInfo();
                    Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime();
                    if (dirtyTimestamp != null) {
                        this.discoveryClient.register();
                        this.instanceInfo.unsetIsDirty(dirtyTimestamp);
                        var6 = false;
                    } else {
                        var6 = false;
                    }
                    break label53;
                } catch (Throwable var7) {
                    logger.warn("There was a problem with the instance info replicator", var7);
                    var6 = false;
                } finally {
                    if (var6) {
                        ScheduledFuture next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
                        this.scheduledPeriodicRef.set(next);
                    }
                }
    
                next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
                this.scheduledPeriodicRef.set(next);
                return;
            }
    
            next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
            this.scheduledPeriodicRef.set(next);
        }
     

      

     
     boolean register() throws Throwable {
            logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier);
    
            EurekaHttpResponse httpResponse;
            try {
                httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
            } catch (Exception var3) {
                logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3});
                throw var3;
            }
    
            if (logger.isInfoEnabled()) {
                logger.info("DiscoveryClient_{} - registration status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
            }
    
            return httpResponse.getStatusCode() == 204;
        }
     

     最后调用register方法对服务进行注册

    3. 服务端接收注册的流程

      接受注册的流程

      

     1.入口:ApplicationResource的addInstance()方法

      

     
    @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
            logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
            if (this.isBlank(info.getId())) {
                return Response.status(400).entity("Missing instanceId").build();
            } else if (this.isBlank(info.getHostName())) {
                return Response.status(400).entity("Missing hostname").build();
            } else if (this.isBlank(info.getIPAddr())) {
                return Response.status(400).entity("Missing ip address").build();
            } else if (this.isBlank(info.getAppName())) {
                return Response.status(400).entity("Missing appName").build();
            } else if (!this.appName.equals(info.getAppName())) {
                return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();
            } else if (info.getDataCenterInfo() == null) {
                return Response.status(400).entity("Missing dataCenterInfo").build();
            } else if (info.getDataCenterInfo().getName() == null) {
                return Response.status(400).entity("Missing dataCenterInfo Name").build();
            } else {
                DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
                if (dataCenterInfo instanceof UniqueIdentifier) {
                    String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();
                    if (this.isBlank(dataCenterInfoId)) {
                        boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                        if (experimental) {
                            String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                            return Response.status(400).entity(entity).build();
                        }
    
                        if (dataCenterInfo instanceof AmazonInfo) {
                            AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
                            String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
                            if (effectiveId == null) {
                                amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
                            }
                        } else {
                            logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                        }
                    }
                }
    
                this.registry.register(info, "true".equals(isReplication));
                return Response.status(204).build();
            }
        }
     

      register()方法

       public void register(final InstanceInfo info, final boolean isReplication) {
            this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
            super.register(info, isReplication);
        }

    ·super.register()方法

      

     
      public void register(InstanceInfo info, boolean isReplication) {
            int leaseDuration = 90;
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
    
            super.register(info, leaseDuration, isReplication);
            this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
        }
     

      说明:

      1、 调用PeerAwareInstanceRegistryImpl的register方法

      2、 完成服务注册后,调用replicateToPeers向其它Eureka Server节点(Peer)做状态同步

    原文 SpringCloud学习笔记(3)----Spring Cloud Netflix之深入理解Eureka

  • 相关阅读:
    python3 提示No module named _sqlite3
    python3 无法使用flask.ext.* 报错的解决方法
    Java中操作时间比较好用的类
    Integer和Integer数据的大小比较
    Django学习(七) 创建第一个Django项目
    Python学习(七) 流程控制if语句
    Django学习(六) 模板
    Python学习(六) Python数据类型:字典(重要)
    Python学习(五) Python数据类型:列表(重要)
    Python学习(四) Python数据类型:序列(重要)
  • 原文地址:https://www.cnblogs.com/xiaoshen666/p/10844136.html
Copyright © 2011-2022 走看看