zoukankan      html  css  js  c++  java
  • Eureka 客户端和服务端间的交互

    Eureka 服务器客户端相关配置

    1.建立eureka服务器

    只需要使用@EnableEurekaServer注解就可以让应用变为Eureka服务器,这是因为spring boot封装了Eureka Server,让你可以嵌入到应用中直接使用。至于真正的EurekaServer是Netflix公司的开源项目,也是可以单独下载使用的

    @SpringBootApplication
    
      @EnableEurekaServer
    
      public class EurekaServer {
    
        public static void main(String[] args) {
    
        SpringApplication.run(EurekaServer.class, args);
    
          }
    
      }
    
    在application.properties配置文件中使用如下配置:
    
      server.port=8761
    
      eureka.instance.hostname=localhost
    
      eureka.client.registerWithEureka=false
    
      eureka.client.fetchRegistry=false
    
      eureka.client.serviceUrl.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka/
    
    
    

    其中server.port配置eureka服务器端口号。Eureka的配置属性都在开源项目spring-cloud-netflix-master中定义,在这个项目中有两个类EurekaInstanceConfigBean 和EurekaClientConfigBean,分别含有eureka.instance和eureka.client相关属性的解释和定义。从中可以看到,registerWithEureka表示是否注册自身到eureka服务器,因为当前这个应用就是eureka服务器,没必要注册自身,所以这里是false。fetchRegistry表示是否从eureka服务器获取注册信息,同上,这里不需要。defaultZone就比较重要了,是设置eureka服务器所在的地址,查询服务和注册服务都需要依赖这个地址

    2.让服务使用eureka服务器

    让服务使用eureka服务器,只需添加@EnableDiscoveryClient注解就可以了。在main方法所在的Application类中,添加@EnableDiscoveryClient注解。然后在配置文件中添加:

        eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
    
      spring.application.name=simple-service
    
    
    

    pom文件需要增加:

       <dependency>
    
           <groupId>org.springframework.cloud</groupId>
    
           <artifactId>spring-cloud-starter-eureka-server</artifactId>
    
      </dependency>
    
    

    其中defaultZone是指定eureka服务器的地址,无论是注册还是发现服务都需要这个地址。application.name是指定进行服务注册时该服务的名称。这个名称就是后面调用服务时的服务标识符,pom文件需要增加:

        <dependency>
    
        <groupId>org.springframework.cloud</groupId>
    
        <artifactId>spring-cloud-starter-eureka</artifactId>
    
      </dependency>
    
    

      如此以来该服务启动后会自动注册到eureka服务器。如果在该服务中还需要调用别的服务,那么直接使用那个服务的服务名称加方法名构成的url即可。
      

    Eureka Client 客户端 服务注册发现接口

    Netxflix 提供的主要操作定义在com.netflix.discovery.EurekaClient中。主要操作有

    其实现类是 com.netflix.discovery.DiscoveryClient。

    Spring cloud中对其进行了封装,定义在org.springframework.cloud.client.discovery.DiscoveryClient中

    服务发现核心类DiscoveryClient(Netflix 非 Spring)

    最核心的一个构造方法

    	 @Inject //google guice 注入遵循 JSR-330规范
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider) {
            if (args != null) {
                this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
                this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
                this.eventBus = args.eventBus;
                this.discoveryJerseyClient = args.eurekaJerseyClient;
            } else {
                this.healthCheckCallbackProvider = null;
                this.healthCheckHandlerProvider = null;
                this.eventBus = null;
                this.discoveryJerseyClient = null;
            }
    
            this.applicationInfoManager = applicationInfoManager;
            InstanceInfo myInfo = applicationInfoManager.getInfo();
    
            this.backupRegistryProvider = backupRegistryProvider;
    
            try {
                scheduler = Executors.newScheduledThreadPool(3,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
                clientConfig = config;
                staticClientConfig = clientConfig;
                transportConfig = config.getTransportConfig();
                instanceInfo = myInfo;
                if (myInfo != null) {
                    appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
                } else {
                    logger.warn("Setting instanceInfo to a passed in null value");
                }
    
                this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
                String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
                final String zone = InstanceInfo.getZone(availZones, myInfo);
                localRegionApps.set(new Applications());
    
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>());  // use direct handoff
    
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>());  // use direct handoff
    
                fetchRegistryGeneration = new AtomicLong(0);
    
                clientAccept = EurekaAccept.fromString(clientConfig.getClientDataAccept());
    
                eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, zone);
    
                if (discoveryJerseyClient == null) {  // if not injected, create one
    
                    EurekaJerseyClientBuilder clientBuilder = new EurekaJerseyClientBuilder()
                            .withUserAgent("Java-EurekaClient")
                            .withConnectionTimeout(clientConfig.getEurekaServerConnectTimeoutSeconds() * 1000)
                            .withReadTimeout(clientConfig.getEurekaServerReadTimeoutSeconds() * 1000)
                            .withMaxConnectionsPerHost(clientConfig.getEurekaServerTotalConnectionsPerHost())
                            .withMaxTotalConnections(clientConfig.getEurekaServerTotalConnections())
                            .withConnectionIdleTimeout(clientConfig.getEurekaConnectionIdleTimeoutSeconds())
                            .withEncoder(clientConfig.getEncoderName())
                            .withDecoder(clientConfig.getDecoderName(), clientConfig.getClientDataAccept());
    
                    if (eurekaServiceUrls.get().get(0).startsWith("https://") &&
                            "true".equals(System.getProperty("com.netflix.eureka.shouldSSLConnectionsUseSystemSocketFactory"))) {
                        clientBuilder.withClientName("DiscoveryClient-HTTPClient-System")
                                .withSystemSSLConfiguration();
                    } else if (clientConfig.getProxyHost() != null && clientConfig.getProxyPort() != null) {
                        clientBuilder.withClientName("Proxy-DiscoveryClient-HTTPClient")
                                .withProxy(
                                        clientConfig.getProxyHost(), clientConfig.getProxyPort(),
                                        clientConfig.getProxyUserName(), clientConfig.getProxyPassword()
                                );
                    } else {
                        clientBuilder.withClientName("DiscoveryClient-HTTPClient");
                    }
                    discoveryJerseyClient = clientBuilder.build();
                }
    
                discoveryApacheClient = discoveryJerseyClient.getClient();
    
                remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
                remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
                AzToRegionMapper azToRegionMapper;
                if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                    azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
                } else {
                    azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
                }
                if (null != remoteRegionsToFetch.get()) {
                    azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
                }
                instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
                boolean enableGZIPContentEncodingFilter = config.shouldGZipContent();
                // should we enable GZip decoding of responses based on Response
                // Headers?
                if (enableGZIPContentEncodingFilter) {
                    // compressed only if there exists a 'Content-Encoding' header
                    // whose value is "gzip"
                    discoveryApacheClient.addFilter(new GZIPContentEncodingFilter(false));
                }
    
                // always enable client identity headers
                String ip = instanceInfo == null ? null : instanceInfo.getIPAddr();
                EurekaClientIdentity identity = new EurekaClientIdentity(ip);
                discoveryApacheClient.addFilter(new EurekaIdentityHeaderFilter(identity));
    
                // add additional ClientFilters if specified
                if (args != null && args.additionalFilters != null) {
                    for (ClientFilter filter : args.additionalFilters) {
                        discoveryApacheClient.addFilter(filter);
                    }
                }
    
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
            }
    
            if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
                fetchRegistryFromBackup();
            }
    
            initScheduledTasks();
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register timers", e);
            }
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
        }
    

    其中有初始化 部分定时任务的(心跳任务、获取注册信息任务)
    client.refresh.interval 30 任务执行间隔

    	/**
         * Initializes all scheduled tasks.
         */
        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
            if (shouldRegister(instanceInfo)) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
    
                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }
    

    a. HeartbeatThread 心跳任务 (心跳线程默认30秒一次 第一次心跳即为第一次注册)

    其中 renew() 方法 为核心方法 ,此心跳机制用来通知Eureka Server 该服务状态正常.如果超过90秒仍然连接失败则此服务会被Eureka Server 从服务注册表移除。此30秒默认间隔不建议调整。

     /**
         * Renew with the eureka service by making the appropriate REST call
         * transport.query.enabled 此属性是否为true决定了
         * 是由 EurekaHttpClient 
         */
        boolean renew() {
            if (shouldUseExperimentalTransportForRegistration()) {
                EurekaHttpResponse<InstanceInfo> httpResponse;
                try {
                    httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                    logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
                    if (httpResponse.getStatusCode() == 404) {
                        REREGISTER_COUNTER.increment();
                        logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                        return register();
                    }
                    return httpResponse.getStatusCode() == 200;
                } catch (Throwable e) {
                    logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
                    return false;
                }
            } else {
                ClientResponse response = null;
                try {
                    response = makeRemoteCall(Action.Renew);
                    logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, (response != null ? response.getStatus() : "not sent"));
                    if (response == null) {
                        return false;
                    }
                    if (response.getStatus() == 404) {
                        REREGISTER_COUNTER.increment();
                        logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                        return register();
                    }
                } catch (Throwable e) {
                    logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
                    return false;
                } finally {
                    if (response != null) {
                        response.close();
                    }
                }
                return true;
            }
        }
        
        /**
         * Register with the eureka service by making the appropriate REST call.
         */
        boolean register() throws Throwable {
            logger.info(PREFIX + appPathIdentifier + ": registering service...");
            if (shouldUseExperimentalTransportForRegistration()) {
                EurekaHttpResponse<Void> httpResponse;
                try {
                    httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
                } catch (Exception e) {
                    logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
                    throw e;
                }
                isRegisteredWithDiscovery = true;
                if (logger.isInfoEnabled()) {
                    logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
                }
                return httpResponse.getStatusCode() == 204;
            } else {
                ClientResponse response = null;
                try {
                    response = makeRemoteCall(Action.Register);
                    isRegisteredWithDiscovery = true;
                    logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, (response != null ? response.getStatus() : "not sent"));
                    return response != null && response.getStatus() == 204;
                } catch (Throwable e) {
                    logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
                    throw e;
                } finally {
                    closeResponse(response);
                }
            }
        }   
        
    

    注册时所传信息 如下:

    b.CacheRefreshThread 获取所有服务注册信息的定时任务

    获取远端注册信息,根据参数会决定是调用getAndStoreFullRegistry()或者getAndUpdateDelta(applications)获取服务端的注册信息刷新client的缓存

    第一次获取到所有服务的注册信息后会将其缓存到本地(getAndStoreFullRegistry()),
    存放容器为 DiscoveryClient 中的 AtomicReference (可以用原子方式更新的对象引用)

    private final AtomicReference<Applications> localRegionApps = 
    new AtomicReference<Applications>();
    
    /**
         * The task that fetches the registry information at specified intervals.
         *
         */
        class CacheRefreshThread implements Runnable {
            public void run() {
                try {
                    boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
    
                    boolean remoteRegionsModified = false;
                    // This makes sure that a dynamic change to remote regions to fetch is honored.
                    String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                    if (null != latestRemoteRegions) {
                        String currentRemoteRegions = remoteRegionsToFetch.get();
                        if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                            // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                            synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                                if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                                    String[] remoteRegions = latestRemoteRegions.split(",");
                                    remoteRegionsRef.set(remoteRegions);
                                    instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                                    remoteRegionsModified = true;
                                } else {
                                    logger.info("Remote regions to fetch modified concurrently," +
                                            " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                                }
                            }
                        } else {
                            // Just refresh mapping to reflect any DNS/Property change
                            instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                        }
                    }
    
                    boolean success = fetchRegistry(remoteRegionsModified);
                    if (success) {
                        registrySize = localRegionApps.get().size();
                        lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                    }
    
                    if (logger.isDebugEnabled()) {
                        StringBuilder allAppsHashCodes = new StringBuilder();
                        allAppsHashCodes.append("Local region apps hashcode: ");
                        allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                        allAppsHashCodes.append(", is fetching remote regions? ");
                        allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                        for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                            allAppsHashCodes.append(", Remote region: ");
                            allAppsHashCodes.append(entry.getKey());
                            allAppsHashCodes.append(" , apps hashcode: ");
                            allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                        }
                        logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                                allAppsHashCodes.toString());
                    }
                } catch (Throwable th) {
                    logger.error("Cannot fetch registry from server", th);
                }
            }
        }
        
        
       /**
         * Fetches the registry information.
         *
         * <p>
         * This method tries to get only deltas after the first fetch unless there
         * is an issue in reconciling eureka server and client registry information.
         * </p>
         *
         * @param forceFullRegistryFetch Forces a full registry fetch.
         *
         * @return true if the registry was fetched
         */
        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    getAndStoreFullRegistry();
                } else {
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // Notify about cache refresh before updating the instance remote status
            onCacheRefreshed();
    
            // Update remote status based on refreshed data held in the cache
            updateInstanceRemoteStatus();
    
            // registry was fetched successfully, so return true
            return true;
        }
    

    上文getAndStoreFullRegistry() 里面有个httpclient

    com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getApplications其实就是发起一个Rest请求

    @Override
    public EurekaHttpResponse<Applications> getApplications(String… regions) {
    return getApplicationsInternal(“apps/”, regions);
    }
    
     private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
                WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
                    regionsParamValue = StringUtil.join(regions);
                    webResource = webResource.queryParam("regions", regionsParamValue);
                Builder requestBuilder = webResource.getRequestBuilder();
                addExtraHeaders(requestBuilder);
                response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
    }
    

    Eureka Server 服务端

    1.服务端接受请求

    可以看到服务端的也是开放一个rest接口

    @POST
    @Consumes({“application/json”, “application/xml”})
    public Response addInstance(InstanceInfo info,
    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    logger.debug(“Registering instance {} (replication={})”, info.getId(), isReplicat
    registry.register(info, “true”.equals(isReplication));
    }
    

    2.执行注册com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.register(InstanceInfo info, boolean isReplication)

    /**
         * Registers the information about the {@link InstanceInfo} and replicates
         * this information to all peer eureka nodes. If this is replication event
         * from other replica nodes then it is not replicated.
         *
         * @param info
         *            the {@link InstanceInfo} to be registered and replicated.
         * @param isReplication
         *            true if this is a replication event from other replica nodes,
         *            false otherwise.
         */
        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
            super.register(info, leaseDuration, isReplication);
            replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
        }
    

    3.注册信息保存

    在其父类 AbstractInstanceRegistry 中实现一般的注册信息存储的操作,其实就是存储在一个 ConcurrentHashMap<String, Map<String, Lease>> registry的结构中。

     private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
     = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
    
     /**
         * Registers a new instance with a given duration.
         *
         * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
         */
        public void register(InstanceInfo r, int leaseDuration, boolean isReplication) {
            try {
                read.lock();
                Map<String, Lease<InstanceInfo>> gMap = registry.get(r.getAppName());
                REGISTER.increment(isReplication);
                if (gMap == null) {
                    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap =
                            new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                    gMap = registry.putIfAbsent(r.getAppName(), gNewMap);
                    if (gMap == null) {
                        gMap = gNewMap;
                    }
                }
                Lease<InstanceInfo> existingLease = gMap.get(r.getId());
                // Retain the last dirty timestamp without overwriting it, if there is already a lease
                if (existingLease != null && (existingLease.getHolder() != null)) {
                    Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                    Long registrationLastDirtyTimestamp = r.getLastDirtyTimestamp();
                    logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                        logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is " +
                                        "greater than the one that is being registered {}",
                                existingLastDirtyTimestamp,
                                registrationLastDirtyTimestamp);
                        r.setLastDirtyTimestamp(existingLastDirtyTimestamp);
                    }
                } else {
                    // The lease does not exist and hence it is a new registration
                    synchronized (lock) {
                        if (this.expectedNumberOfRenewsPerMin > 0) {
                            // Since the client wants to cancel it, reduce the threshold
                            // (1
                            // for 30 seconds, 2 for a minute)
                            this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                            this.numberOfRenewsPerMinThreshold =
                                    (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                        }
                    }
                    logger.debug("No previous lease information found; it is new registration");
                }
                Lease<InstanceInfo> lease = new Lease<InstanceInfo>(r, leaseDuration);
                if (existingLease != null) {
                    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                }
                gMap.put(r.getId(), lease);
                synchronized (recentRegisteredQueue) {
                    recentRegisteredQueue.add(new Pair<Long, String>(
                            System.currentTimeMillis(),
                            r.getAppName() + "(" + r.getId() + ")"));
                }
                // This is where the initial state transfer of overridden status happens
                if (!InstanceStatus.UNKNOWN.equals(r.getOverriddenStatus())) {
                    logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                    + "overrides", r.getOverriddenStatus(), r.getId());
                    if (!overriddenInstanceStatusMap.containsKey(r.getId())) {
                        logger.info("Not found overridden id {} and hence adding it", r.getId());
                        overriddenInstanceStatusMap.put(r.getId(), r.getOverriddenStatus());
                    }
                }
                InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(r.getId());
                if (overriddenStatusFromMap != null) {
                    logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                    r.setOverriddenStatus(overriddenStatusFromMap);
                }
    
                // Set the status based on the overridden status rules
                InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(r, existingLease, isReplication);
                r.setStatusWithoutDirty(overriddenInstanceStatus);
    
                // If the lease is registered with UP status, set lease service up timestamp
                if (InstanceStatus.UP.equals(r.getStatus())) {
                    lease.serviceUp();
                }
                r.setActionType(ActionType.ADDED);
                recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                r.setLastUpdatedTimestamp();
                invalidateCache(r.getAppName(), r.getVIPAddress(), r.getSecureVipAddress());
                logger.info("Registered instance {}/{} with status {} (replication={})",
                        r.getAppName(), r.getId(), r.getStatus(), isReplication);
            } finally {
                read.unlock();
            }
        }
    

    附:

    Eureka REST operations

  • 相关阅读:
    Add Binary <leetcode>
    那些坑
    面试集锦
    随看随记
    View的事件处理流程
    android studio view.setId报错
    EditText的hint不显示
    EditText 焦点
    Android拍照的那些事
    微信支付提示签名错误
  • 原文地址:https://www.cnblogs.com/huangpeng1990/p/5614476.html
Copyright © 2011-2022 走看看