zoukankan      html  css  js  c++  java
  • eureka的注册

    eureka client的注册

    Eureka-Client 向 Eureka-Server 发起注册应用实例需要符合如下条件:

    • 配置 eureka.registration.enabled = true,Eureka-Client 向 Eureka-Server 发起注册应用实例的开关
    • InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致。

    每次 InstanceInfo 发生属性变化时,标记 isInstanceInfoDirty 属性为 true,表示 InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致,需要注册。另外,InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册。

    当符合条件时,InstanceInfo 不会立即向 Eureka-Server 注册,而是后台线程定时注册。

    当 InstanceInfo 的状态( status ) 属性发生变化时,并且配置 eureka.shouldOnDemandUpdateStatusChange = true 时,立即向 Eureka-Server 注册。因为状态属性非常重要,一般情况下建议开启,当然默认情况也是开启的

    eureka-client的代码非常怪,因为注册逻辑在InstanceInfoReplicator组件里面,服务实例信息复制组件,就是这么一个复制组件,来负责服务的注册,replicate一般理解为复制,但是它却来做注册的功能,所以第一次看代码会找不到入口。

    DiscoveryClient#initScheduledTasks

    image-20210928134456162

    传进去的参数为40s,会把isInstanceInfoDirty设置为true,然后把当前时间戳设置成lastDirtyTimestamp

        public void start(int initialDelayMs) {
            if (started.compareAndSet(false, true)) {
                instanceInfo.setIsDirty();
                //把自己当成一个线程,默认是40秒去执行这个线程
                Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    
        /**
         * Sets the dirty flag so that the instance information can be carried to
         * the discovery server on the next heartbeat.
         */
        public synchronized void setIsDirty() {
            isInstanceInfoDirty = true;
            lastDirtyTimestamp = System.currentTimeMillis();
        }
    

    定时检查 InstanceInfo 的状态( status ) 属性是否发生变化。若是,发起注册。实现代码如下:

        public void run() {
            try {
                //刷新了服务实例的信息
                discoveryClient.refreshInstanceInfo();
    
                //由于前面setIsDirty中把isInstanceInfoDirty设置了true,此处就会返回时间戳,因此会发生注册
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    

    服务刷新暂时先不关注,直接去看注册的代码,服务注册的时候,是基于EurekaClient的reigster()方法去注册的,调用的是底层的TransportClient的RegistrationClient,执行了register()方法,将InstanceInfo服务实例的信息,通过http请求,调用eureka server对外暴露的一个restful接口,将InstanceInfo给发送了过去。这里找的是EurekaTransport,在构造的时候,调用了scheduleServerEndpointTask()方法,这个方法里就初始化了专门用于注册的RegistrationClient。

        /**
         * Register with the eureka service by making the appropriate REST call.
         */
        boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
        }
    

    真正执行注册请求的,就是eureka-client-jersey2工程里的AbstractJersey2EurekaHttpClient,请求http://localhost:8080/v2/apps/ServiceA,将服务实例的信息发送过去

        @Override
        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName();
            Response response = null;
            try {
                Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
                addExtraProperties(resourceBuilder);
                addExtraHeaders(resourceBuilder);
                response = resourceBuilder
                        .accept(MediaType.APPLICATION_JSON)
                        .acceptEncoding("gzip")
                        .post(Entity.json(info));
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }
    

    com.netflix.eureka.resources.ApplicationResource,处理单个应用的请求操作的 Resource ( Controller )。注册应用实例信息的请求,映射 ApplicationResource#addInstance() 方法,实现代码如下:

        @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info,
                                    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
            logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
            // validate that the instanceinfo contains all the necessary required fields
            if (isBlank(info.getId())) {
                return Response.status(400).entity("Missing instanceId").build();
            } else if (isBlank(info.getHostName())) {
                return Response.status(400).entity("Missing hostname").build();
            } else if (isBlank(info.getIPAddr())) {
                return Response.status(400).entity("Missing ip address").build();
            } else if (isBlank(info.getAppName())) {
                return Response.status(400).entity("Missing appName").build();
            } else if (!appName.equals(info.getAppName())) {
                return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
            } else if (info.getDataCenterInfo() == null) {
                return Response.status(400).entity("Missing dataCenterInfo").build();
            } else if (info.getDataCenterInfo().getName() == null) {
                return Response.status(400).entity("Missing dataCenterInfo Name").build();
            }
    
            // handle cases where clients may be registering with bad DataCenterInfo with missing data
            DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
            if (dataCenterInfo instanceof UniqueIdentifier) {
                String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
                if (isBlank(dataCenterInfoId)) {
                    boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                    if (experimental) {
                        String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                        return Response.status(400).entity(entity).build();
                    } else if (dataCenterInfo instanceof AmazonInfo) {
                        AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                        String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                        if (effectiveId == null) {
                            amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                        }
                    } else {
                        logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                    }
                }
            }
    
            registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();  // 204 to be backwards compatible
        }
    

    调用 PeerAwareInstanceRegistryImpl#register(...) 方法,注册应用实例信息。实现代码如下:

        public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            read.lock();
            try {
                Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
                // 增加 注册次数 到 监控
                REGISTER.increment(isReplication);
                //如果某个服务第一次来注册,那gMap是获取不到的
                //这个时候就会注册一个新的gNewMap,赋值给gMap
                //gmap就是注册一个注册表,包含了所有实例的信息
                if (gMap == null) {
                    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                    if (gMap == null) {
                        gMap = gNewMap;
                    }
                }
                //感觉InstanceID,获取到实例
                Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
                // Retain the last dirty timestamp without overwriting it, if there is already a lease
                if (existingLease != null && (existingLease.getHolder() != null)) {
    
                    Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                    Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                    logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
    
                    // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                    // InstanceInfo instead of the server local copy.
                    if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                        logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                                " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                        logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                        registrant = existingLease.getHolder();
                    }
                } else {
                    // 如果这个实例第一次进来的话,就走这个判断
                    synchronized (lock) {
                        if (this.expectedNumberOfClientsSendingRenews > 0) {
                            // Since the client wants to register it, increase the number of clients sending renews
                            //看注释,当客户端进行注册的时候,会新增一共个,期望客户端数量的值
                            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                            updateRenewsPerMinThreshold();
                        }
                    }
                    logger.debug("No previous lease information found; it is new registration");
                }
                //会将InstanceInfo对象封装成一个Lease对象
                Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
                if (existingLease != null) {
                    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                }
                //讲上面封装的Lease对象放到gMap中,key就是服务实例Id
                gMap.put(registrant.getId(), lease);
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
                // This is where the initial state transfer of overridden status happens
                if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                    logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                    + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                    if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                        logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                        overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                    }
                }
                InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
                if (overriddenStatusFromMap != null) {
                    logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                    registrant.setOverriddenStatus(overriddenStatusFromMap);
                }
    
                // Set the status based on the overridden status rules
                InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
                registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
                // If the lease is registered with UP status, set lease service up timestamp
                if (InstanceStatus.UP.equals(registrant.getStatus())) {
                    lease.serviceUp();
                }
                registrant.setActionType(ActionType.ADDED);
                // 添加到 最近租约变更记录队列
                recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                // 设置 租约的最后更新时间戳
                registrant.setLastUpdatedTimestamp();
                //清空缓存
                invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
                logger.info("Registered instance {}/{} with status {} (replication={})",
                        registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
            } finally {
                read.unlock();
            }
        }
    
    public class Lease<T> {
    
        /**
         * 实体
         */
        private T holder;
        /**
         * 注册时间戳
         */
        private long registrationTimestamp;
        /**
         * 开始服务时间戳
         */
        private long serviceUpTimestamp;
        /**
         * 取消注册时间戳
         */
        private long evictionTimestamp;
        /**
         * 最后更新时间戳
         */
        // Make it volatile so that the expiration task would see this quicker
        private volatile long lastUpdateTimestamp;
        /**
         * 租约持续时长,单位:毫秒
         */
        private long duration;
    
        public Lease(T r, int durationInSecs) {
            holder = r;
            registrationTimestamp = System.currentTimeMillis();
            lastUpdateTimestamp = registrationTimestamp;
            duration = (durationInSecs * 1000);
        }
        
    }
    
    
    • holder 属性,租约的持有者。在 Eureka-Server 里,暂时只有 InstanceInfo 使用。
    • registrationTimestamp 属性,注册( 创建 )租约时间戳。在构造方法里可以看租约对象的创建时间戳即为注册租约时间戳。
    • serviceUpTimestamp 属性,开始服务时间戳。
    • lastUpdatedTimestamp 属性,最后更新租约时间戳。每次续租时,更新该时间戳。注册应用实例信息会使用到它如下方法,实现代码如下:
    • duration 属性,租约持续时间,单位:毫秒。当租约过久未续租,即当前时间 - lastUpdatedTimestamp > duration 时,租约过期。
    • evictionTimestamp 属性,租约过期时间戳。

    下面对InstanceInfo的具体信息

    image-20211011195313814

    下图为注册表的信息

    image-20211011200837313

  • 相关阅读:
    项目打包发布到tomcat中,中文出现乱码
    打war包时无法把src/main/java里的xml文件打包上去
    Activemq和Rabbitmq端口冲突
    博客园皮肤炫酷效果
    centos7 ffmpeg安装
    centos7 nginx开启启动
    磁盘满了,找不到占磁盘的文件或者日志
    turn服务部署
    kvm虚拟机配置被克隆rhel6客户机的网卡
    jenkins自动构建
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15348230.html
Copyright © 2011-2022 走看看