zoukankan      html  css  js  c++  java
  • 【源码学习-eureka】说一说EurekaClient是在什么地方将自己注册到注册中心的?

    相信很多看过eureka源码的同学,在研读ExampleEurekaClient这个eureka-client启动类的时候,跟着看了一遍代码,发现一个问题。这个不是client嘛?他不是得注册到注册中心去么?怎么没看到注册的动作在什么地方啊?如果你也存在这样的问题或者疑惑,相信我,你不是一个人。

    ExampleEurekaClient这个类的启动过程流程大致上如图所示:

    这个是大致的一个启动过程,通篇看下来,似乎没有和服务注册相关的东西啊。那怎么会对呢?数据是怎么部署到eureka注册中心上去的呢?其实在这个流程里面我们已经将eureka-client注册到注册中心了。只是因为eureka他的代码层次太深,而且部分地方我感觉命名是不太合适的。所以导致,我们按照以往看源码得习惯去看这个代码的时候很容易将其遗漏掉。

    那我先说结论,eureka-client是在初始化调度任务。也是在执行DiscoveryClient类的intiScheduledTask方法的时候将自己注册到注册中心的。

    源码刨析

    都说源码之下,了无秘密,那让我们带着结论和问题,一起去看看eureka-client是怎么将自己注册到注册中心的。先看一下初始化调度任务这部分代码。

    initScheduledTasks方法

    private void initScheduledTasks() {
            // 定时抓取注册表的调度任务
            ……
            // 给eureka-server定时发送心跳的调度任务
            ……
    
            // InstanceInfo replicator
            // 服务实例注册
            instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize
    		// 一堆其他代码 
        	……
            // 进行注册
        	instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    }
    

    我们看到在代码中出现了一个很有意思的类就是instanceInfoReplicator第一次看源码的时候,我一看这个名字,觉得是这不就是一个服务实例的复制嘛,就没仔细看里面的东西。结果梳理完client启动过程之后发现找不到client注册到server的操作。然后不得不反过头来重新看了一下eureka的代码。最终发现就是在这个地方进行的服务注册。

    这里不得不吐个槽,搞一个服务注册动作,叫instanceRegister之类的名字他不香吗?搞的这么花里胡哨,谁能猜出来。我以为这个方法就这么简单的过了,后来才知道,是我自己太天真了。

    InstancInfoReplicator构造方法

    进入InstanceInfoReplicator构造方法,看他们都做了什么事儿。

    InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
            this.discoveryClient = discoveryClient;
            this.instanceInfo = instanceInfo;
            this.scheduler = Executors.newScheduledThreadPool(1,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
                            .setDaemon(true)
                            .build());
    
            this.scheduledPeriodicRef = new AtomicReference<Future>();
    
            this.started = new AtomicBoolean(false);
            this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
            this.replicationIntervalSeconds = replicationIntervalSeconds;
            this.burstSize = burstSize;
    
            this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
            logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
        }
    

    大致看看,其实没什么东西,只是一些简单的赋值以及初始化操作。其中和我们后面关系比较深的几个参数分别是scheduler started

    之前我们在分析initScheduledTasks方法的时候,发现instanceInfoReplicator对象执行了一个start方法。nstanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

    刚才我们看完了InstancInfoReplicator的构造方法之后,我们再来看一下这个start方法

    start方法

    我们先说他的这个参数,clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()。这个方法的注释是这样的

    	 /**
         * Indicates how long initially (in seconds) to replicate instance info
         * to the eureka server
         */
        int getInitialInstanceInfoReplicationIntervalSeconds();
    

    大致意思就是说这个代码注册到eureka server的时间,单位是秒。那个这个时间是多久呢?如果你没有单独设置的话,默认是40秒。下面代码中的意思是,如果有就获取,如果没有的话,就返回40

        @Override
        public int getInitialInstanceInfoReplicationIntervalSeconds() {
            return configInstance.getIntProperty(
                    namespace + INITIAL_REGISTRATION_REPLICATION_DELAY_KEY, 40).get();
        }
    

    说完参数之后,我们接着看一下start的代码吧。

        public void start(int initialDelayMs) {
            // 第一次进来的时候一定是false,因为在初始化这个类的时候,手动设置成了false。this.started = new AtomicBoolean(false);
            if (started.compareAndSet(false, true)) {
                instanceInfo.setIsDirty();  // for initial register
                // 40s之后执行线程  initialDelayMs 默认是 40s
                Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    

    我们看到了两个熟悉的参数scheduler started,我们可以看到从代码顺序上来说

    1. 他先判断了started是否是false,如果是修改为true。
    2. 然后执行了一个setIsDirty方法,这个方法我们稍后再看
    3. 然后将自己放到了一个线程池了,这个线程池间隔40秒后执行
    4. 然后将future放到了private final AtomicReference scheduledPeriodicRef 里面存储

    我当时看到这里的时候真的是一头雾水,脑海中冒出好几个问题来。

    1. setIsDirty方法干啥用的呀?
    2. 为啥this能放到线程池里面运行啊?
    3. 这个线程池执行的之后,都干了什么啊?

    我们带着问题去看继续看一下代码

    setIsDirty方法

        public synchronized void setIsDirty() {
            isInstanceInfoDirty = true;
            lastDirtyTimestamp = System.currentTimeMillis();
        }
    

    这个方法里只有两个赋值操作,搞不明白他想干什么。我们带着这个问题继续往下走

    run方法

    刚才说到了为什么this可以放到线程池里面,我们将鼠标滑动到方法开头的位置

    class InstanceInfoReplicator implements Runnable
    

    这个类是继承自Runnable接口的,那么他必然会实现run方法,我们将它放在线程池里面,最终执行的也是这个类实现的run方法而已。那我们接着去看run方法吧

        @Override
        public void run() {
            try {
                // 刷新服务实例的信息
                discoveryClient.refreshInstanceInfo();
    			
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                // 这个参数在第一次执行的时候,是绝对不可能为null的,因为在创建这个线程的时候,刚刚为其赋值
                if (dirtyTimestamp != null) {
                    // 注册
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    

    我的天,终于看到了register这个单词。太不容易了。看到Long dirtyTimestamp = instanceInfo.isDirtyWithTime();这个代码了吗?后面对其是否为null进行了判断,按照刚才我们的执行顺序是不可能为false的,因为我们再isSetDirty方法中刚刚 为其进行了赋值操作。然后我们继续来看register方法。

    register方法

     /**
         * Register with the eureka service by making the appropriate REST call.
         */
        boolean register() throws Throwable {
            logger.info(PREFIX + appPathIdentifier + ": registering service...");
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }
    

    通过代码,我们很明确的看到请求是通过eurekaTransport.registrationClient.register方法发出去的,大胆猜测一下发出去的内容应该就是服务实例。

    我们跟进去看一下

        // 真正执行注册的方法
        @Override
        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName();
            Response response = null;
            try {
                // http://localhost:8080/v2/apps/ServiceA
                // 发送的是post请求,将服务实例的对象做成了json发送过去 包含了自己的主机、ip、端口号等等
                Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
                addExtraProperties(resourceBuilder);
                addExtraHeaders(resourceBuilder);
                response = resourceBuilder
                        .accept(MediaType.APPLICATION_JSON)
                        .acceptEncoding("gzip")
                        .post(Entity.json(info));
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }
    

    至此位置终于搞定了这个这个问题,client就是执行的这个方法将其服务实例转换为json,通过post请求的方式,将自己注册到注册中心去的。相信仔细看到这里的朋友还有一个问题eurekaTransport.registrationClient这个类是什么时候初始化的呢?

    其实是在初始化网络组件EurekaTransport的时候。

    代码如下

    // 初始化网络通信组件EurekaTransport
    eurekaTransport = new EurekaTransport();
    // 在这个方法中对eurekaTransport的成员变量registrationClient进行了赋值操作
    scheduleServerEndpointTask(eurekaTransport, args);
    

    初始化registrationClient的代码

    if (clientConfig.shouldRegisterWithEureka()) {
        EurekaHttpClientFactory newRegistrationClientFactory = null;
        EurekaHttpClient newRegistrationClient = null;
        try {
            newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                eurekaTransport.bootstrapResolver,
                eurekaTransport.transportClientFactory,
                transportConfig
            );
            newRegistrationClient = newRegistrationClientFactory.newClient();
        } catch (Exception e) {
            logger.warn("Transport initialization failure", e);
        }
        eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
        eurekaTransport.registrationClient = newRegistrationClient;
    }
    
  • 相关阅读:
    获取Finacial dimension value的description 值
    创建一个List获取数据的lookup
    定位form光标行
    Business Unit Lookup in Form
    Linu各种版本
    redis的具体使用
    php中date()函数使用的方法
    Spring整合Hibernate中自动建表
    Android之手机电池电量应用
    SSH整合时,关于访问数据库的load的错误
  • 原文地址:https://www.cnblogs.com/joimages/p/13258939.html
Copyright © 2011-2022 走看看