zoukankan      html  css  js  c++  java
  • Ribbon是怎么和Eureka整合的?

    Ribbon是怎么和Eureka整合的?

    因为访问请求的时候,拿到的是LoadBalancer实例,通过它去访问被请求的服务,那它里面肯定会有服务请求,这样才能负载均衡。

    ZoneAwareLoadBalancer被创建的时候,就是通过下面的有参构造器,它会调用父类的构造,就继续往下看。

        public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                     IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                     ServerListUpdater serverListUpdater) {
            super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
        }
    
    
        public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                             ServerList<T> serverList, ServerListFilter<T> filter,
                                             ServerListUpdater serverListUpdater) {
            super(clientConfig, rule, ping);
            this.serverListImpl = serverList;
            this.filter = filter;
            this.serverListUpdater = serverListUpdater;
            if (filter instanceof AbstractServerListFilter) {
                ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
            }
            restOfInit(clientConfig);
        }
    

    里面只有一个restOfInit方法

        void restOfInit(IClientConfig clientConfig) {
            boolean primeConnection = this.isEnablePrimingConnections();
            // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
            this.setEnablePrimingConnections(false);
            enableAndInitLearnNewServersFeature();
    
            updateListOfServers();
            if (primeConnection && this.getPrimeConnections() != null) {
                this.getPrimeConnections()
                        .primeConnections(getReachableServers());
            }
            this.setEnablePrimingConnections(primeConnection);
            LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
        }
    

    enableAndInitLearnNewServersFeature();看这个方法名,像是开启并且初始化新的服务实例。

    updateListOfServers();看上去像是对服务的list进行更新。

    但是在ZoneAwareLoadBalancer的构造方法中,会传进ServerList serverList,也就是说一开始的时候就能拿到全量的服务清单,然后进行构造,后续有变化在进行更新操作,那看看传进来的serverList,到底是什么?

    image-20211008232015914

    一个有5个实现类,发现只有DomainExtractingServerList是有关系的,既然它是一个bean,那肯定也是在配置类中注入的,那就继续找找,结果就出现在了EurekaRibbonClientConfiguration中。

    	@Bean
    	@ConditionalOnMissingBean
    	public ServerList<?> ribbonServerList(IClientConfig config,
    			Provider<EurekaClient> eurekaClientProvider) {
    		if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
    			return this.propertiesFactory.get(ServerList.class, config, serviceId);
    		}
    		DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
    				config, eurekaClientProvider);
    		DomainExtractingServerList serverList = new DomainExtractingServerList(
    				discoveryServerList, config, this.approximateZoneFromHostname);
    		return serverList;
    	}
    

    实际上都是通过DiscoveryEnabledNIWSServerList来获取到的,来更新的方法也是调用它的getUpdatedListOfServers,那下面就看一下增量是怎么获取的。

    那我们就先进去enableAndInitLearnNewServersFeature中看看,

        public void enableAndInitLearnNewServersFeature() {
            LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
            serverListUpdater.start(updateAction);
        }
    

    会来到PollingServerListUpdater#start,上面一段代码就是创建一个runnable,核心的方法就是做一个update操作,默认的是1秒钟过后,会第一次执行那个Runnable线程,以后是每隔30秒执行一下那个Runnable线程,就去从eureka client刷新注册表到自己的ribbon的LoadBalancer中来。

        @Override
        public synchronized void start(final UpdateAction updateAction) {
            if (isActive.compareAndSet(false, true)) {
                final Runnable wrapperRunnable = new Runnable() {
                    @Override
                    public void run() {
                        if (!isActive.get()) {
                            if (scheduledFuture != null) {
                                scheduledFuture.cancel(true);
                            }
                            return;
                        }
                        try {
                            updateAction.doUpdate();
                            lastUpdated = System.currentTimeMillis();
                        } catch (Exception e) {
                            logger.warn("Failed one update cycle", e);
                        }
                    }
                };
    
                scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                        wrapperRunnable,
                        initialDelayMs,
                        refreshIntervalMs,
                        TimeUnit.MILLISECONDS
                );
            } else {
                logger.info("Already active, no-op");
            }
        }
    

    然后再执行更新操作。

        @VisibleForTesting
        public void updateListOfServers() {
            List<T> servers = new ArrayList<T>();
            if (serverListImpl != null) {
                servers = serverListImpl.getUpdatedListOfServers();
                LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
    
                if (filter != null) {
                    servers = filter.getFilteredListOfServers(servers);
                    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                            getIdentifier(), servers);
                }
            }
            updateAllServerList(servers);
        }
    

    其中serverListImpl.getUpdatedListOfServers()最终会来到DiscoveryEnabledNIWSServerList#getUpdatedListOfServers,根据它来获得全部的服务实例清单

    image-20211018162449464

    image-20211018162533964

        @Override
        public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
            return obtainServersViaDiscovery();
        }
    
        private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
            List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
    
            if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
                logger.warn("EurekaClient has not been initialized yet, returning an empty list");
                return new ArrayList<DiscoveryEnabledServer>();
            }
    
            EurekaClient eurekaClient = eurekaClientProvider.get();
            if (vipAddresses!=null){
                for (String vipAddress : vipAddresses.split(",")) {
                    // if targetRegion is null, it will be interpreted as the same region of client
                    List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                    for (InstanceInfo ii : listOfInstanceInfo) {
                        if (ii.getStatus().equals(InstanceStatus.UP)) {
    
                            if(shouldUseOverridePort){
                                if(logger.isDebugEnabled()){
                                    logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                                }
    
                                // copy is necessary since the InstanceInfo builder just uses the original reference,
                                // and we don't want to corrupt the global eureka copy of the object which may be
                                // used by other clients in our system
                                InstanceInfo copy = new InstanceInfo(ii);
    
                                if(isSecure){
                                    ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                                }else{
                                    ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                                }
                            }
    
                            DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                            serverList.add(des);
                        }
                    }
                    if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                        break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                    }
                }
            }
            return serverList;
        }
    

    只有第一次请求的时候会走到这,主要有一个更新线程的初始化启动过程。

    image-20211018161953641

  • 相关阅读:
    运维:生产日志重复打印了,赶紧来看看~
    就这样,我走过了程序员的前五年。一路风雨泥泞,前方阳光正好。
    一个排序引发的BUG
    曝光一个网站,我周末就耗在上面了。
    我不服!这开源项目居然才888个星!?
    知乎的一次29.7元的咨询
    面试官:啥是请求重放呀?
    414天前,我以为这是编程玄学...
    老爷子这代码,看跪了!
    面试官一个线程池问题把我问懵逼了。
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15421105.html
Copyright © 2011-2022 走看看