zoukankan      html  css  js  c++  java
  • ribbon源码

    一. ribbon的使用

    通过前面对微服务ribbon的学习, 知道了ribbon的基本使用方式。

    比如:我的order服务,想要调用stock减库存的操作。 应该怎么实现呢? 

    第一步:引入ribbon

    @LoadBalanced
    @Bean
    public RestTemplate getRestTemplate() {
        return new RestTemplate();
    }

    这里通过@LoadBalance注解, 引入了ribbon, 自动实现ribbon的负载均衡策略

    第二步:写接口

     如上图, 通过restTemplate手动指定stock服务,并调用其接口. 我们看到在域名部分,我们写的是服务名. 其实调用了ribbon的负载均衡策略以后, 我们大概可以知道, 它是将

    http://stock 变成了 http://ip:port/的形式. 这其实就是ribbon的原理. 那么他是如何来选择ip和port的呢? 这就是具体的实现. 是使用轮训的方式找到ip, 还是使用随机的方式找到ip

    学习源码的方法一
    1. 以ribbon为例, 先学会使用ribbon, 知道ribbon具体有哪些功能, 知道其效果
    2. 猜测ribbon是如何实现的, 也就是说, 如果是我们自己来实现ribbon的负载均衡功能, 我们要怎么做?
    3. 看源码, 对比自己的思想和源码的异同. 
    
    这是一种有自己思考的学习方式. 对于学习源码来说也会觉得更有趣

    其实,如果想在项目中使用ribbon, 这两步基础就ok了, 那么, 他到底是在底层如何运转的呢? 来看看ribbon的实现.

    二. ribbon源码入口

    1. 就从@LoadBalanced这个注解入手

    点击进入到@LoadBalanced的源码

    这个源码就是定义了一个注解,没有特殊的含义. 这是一个接口, 那么注解是在哪里被实现的呢?那么,就需要查源码调用了

    2. 查找loadBalanced的实现类

    怎么找实现类呢?入口在哪里?

    方法一: 入口通常在META-INF/spring.factories文件里.里面找到引入了LoadBalanced 类的初始化类. 

    通过观察, 发现和LoadBalanced有关的自动配置类有两个, 二第一个关联性更大, 因为名字基本一样. 所以, 先定位到第一个

    方法二: 纯经验猜测.  

    首先找到LoadBalanced注解所在的包, 然后看看里面有没有和LoadBalanced有关系的AutoConfiguration配置了, 这就是靠猜了

    然后, 我们可以很容易的就看到LoadBalancerAutoConfiguration类. 这个是凭经验找到了, 接下来, 验证一下是不是这个类.

    三. LoadBalancerAutoConfiguration自动配置类

    看一看,它里面都注入了哪些东西呢?

     首先, 初始化了一个LoadBalancerInterceptor拦截器, 

    @Bean
    public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
         return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
    }

    这是一个拦截器, 也就是负载均衡具体实现的拦截器, 他是通过哪个拦截器实现的呢? 这里面是使用LoadBalancerInterceptor实现的. 其实, 这里面是真正实现负载均衡功能的地方

    先简单看一下, 下面来看看另一个初始化方法

    第二. 初始化RestTemplateCustomeizer方法

    @Bean
    @ConditionalOnMissingBean
    public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
         return (restTemplate) -> {
             List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
             list.add(loadBalancerInterceptor);
             restTemplate.setInterceptors(list);
         };
    }

    首先看这个方法的入参, 方法的入参是谁呢? 就是第一步初始化的LoadBalancerInterceptor. 在@Bean注解中, 入参前面省略了@AutoWired, 也就是说, 相当于自动引入了LoadBalancerInterceptor类.

    然后再来看返回值, 返回值是一个RestTemplateCustomizer, 这是一个接口, 里面就定义了一个方法

    public interface RestTemplateCustomizer {
        void customize(RestTemplate restTemplate);
    }

    而下面这段代码返回的一定是一个RestTemplateCustomeizer, 那么return的内容就是customize(RestTemplate restTemplate)的具体实现了

    return (restTemplate) -> {
        List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
        list.add(loadBalancerInterceptor);
        restTemplate.setInterceptors(list);
    };

    public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {....}具体实现是什么呢?

    就是将我们上面定义的拦截器添加到restTemplate中. restTemplate哪里来的呢? 我们可以看到最上面定义了这个

    @LoadBalanced
    @Autowired(
         required = false
        )
    private List<RestTemplate> restTemplates = Collections.emptyList();

    他的含义是: 为每一个加了@LoadBalanced注解的RestTemplate, 都将其添加到restTemplates集合中. 然后对这个集合进行处理

    第三步: 初始化SmartInitializingSingleton类

      @Bean
        public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
            return () -> {
                restTemplateCustomizers.ifAvailable((customizers) -> {
                    Iterator var2 = this.restTemplates.iterator();
    
                    while(var2.hasNext()) {
                        RestTemplate restTemplate = (RestTemplate)var2.next();
                        Iterator var4 = customizers.iterator();
    
                        while(var4.hasNext()) {
                            RestTemplateCustomizer customizer = (RestTemplateCustomizer)var4.next();
                            customizer.customize(restTemplate);
                        }
                    }
    
                });
            };
        }

    这个类的入参是final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers, 也就是RestTemplate的定制器. 是一个list列表集合

    具体实现是: 循环遍历this.restTemplates.iterator(), 也就是初始化时带有@LoadBalanced的RestTemplate, 然后执行里面的定制内容customize. 

     

     大概知道了自动配置里面引入了哪些类, 其中拦截器的实现是具体ribbon逻辑实现部分, 所以, 下面我们来看LoadBalancerInterceptor

    四. LoadBalancerInterceptor拦截器实现类

    小贴士: 看源码技巧
    
    这里有一个规则, 如果看过滤器, 那么其他方法都不重要, 主要看filter的具体实现. 看拦截器, 其他都不重要, 主要看intercept方法的实现

    直接看intercept方法的实现

    package org.springframework.cloud.client.loadbalancer;
    
    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
        ......
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
            URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
            return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
        }
    }

    具体实现: 第一步: 获取请求的uri, 我们这里的是http://stock/stock/reduct/count/1/2

                  第二步: 获取服务的host域名, 这里对应的就是stock.

          第三步: this.loadBalancer.execute(serviceName, ......). 这里传入了服务名, 我们猜测一下实现. 首先会根据服务名获取集群, 然后在根据负载均衡策略找到ip, 然后调用http请求, 发送到指定ip

     前两步不说了, 不那么重要, 来看第三步. 具体实现

      public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
            ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
            Server server = this.getServer(loadBalancer, hint);
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            } else {
                RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
                return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
            }
        }

    这里代码很短, 简单看一下

    • 第一步: 获取负载均衡器,
      • ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId); 
      • 负载均衡器猜一下干嘛的, 一会我们选出了3个或者5个节点, 要使用负载均衡器来看看, 到底选择哪一个节点发送请求
    • Server server = this.getServer(loadBalancer, hint);
      • 将LoadBalancer作为参数传入, 最终过滤选择出一个server, 将请求发送到这个server上.

     我们来看第一步: 获取负载均衡器 ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId); 

     在这一步走到getInstance(name, ILoadBalancer.class);根据名字获取一个负载均衡器. 返回的是一个ILoadBalancer, 我们知道这是一个接口, 那么他到底是什么类型的负载均衡器呢?

    我们不知道, 那么能知道的是他一定是在某个地方初始化的时候, 指定了使用哪一个实现类. 而通常这用定义实现类的地方在Configuration的@Bean中. 

    根据这个思路, 我们在RibbonLoadBalancerClient包下找一找有没有类似的Configuration. 根据名字猜测, 最终找到了

    RibbonClientConfiguration

    而这里面刚好有ILoadBalancer的实现

        @Bean
        @ConditionalOnMissingBean
        public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
            if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
                return this.propertiesFactory.get(ILoadBalancer.class, config, name);
            }
            return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                    serverListFilter, serverListUpdater);
        }

    棕色部分的代码是去application.yml属性值取, 如果有特别指定使用哪个ILoadBalancer类, 那么优先使用配置中的, 如果配置没有, 则使用new ZoneAwareLoadBalancer<>(...)类.

    好了, 我们找到这里就可以了, 先不看ZoneAwareLoadBalancer的具体实现.

     接下来看这一步: Server server = this.getServer(loadBalancer, hint); 获取服务

    package org.springframework.cloud.netflix.ribbon;
    
    public class RibbonLoadBalancerClient implements LoadBalancerClient {
            ......
    
        protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
            if (loadBalancer == null) {
                return null;
            }
            // Use 'default' on a null hint, or just pass it on?
            return loadBalancer.chooseServer(hint != null ? hint : "default");
        }
    }

    走到这段源码, 我们就知道了, 要低啊用LoadBalancer的chooseServer方法. 而在上一步,我们知道这个LoadBalancer的实现类是ZoneAwareLoadBalancer, 所以, 我们可以直接到ZoneAwareLoadBalancer里面找到chooseServer方法了.

     

     这里指向的是亚马逊的区域, 在中国只有一个区, 所以, 这里的数量始终是1, 所以, 最后走的是else分支

    调用return super.chooseServer(key);方法

     在super的chooseServer(key)方法里, 其他都不重要, 重要的是this.rule.choose(key)方法

    这里的rule是一个接口protected IRule rule; 联想一下, 现在有负载均衡器了, 那么还要有负载均衡规则, 而rule正是具体实现的负载均衡规则.

    根据经验我们知道, 这个IRule接口一定是在某个地方通过@Bean被初始化了, 

    又是根据经验, 我们猜到这个初始化的位置, 应该和刚才ILoadBalancer在同一个地方, 因为他们是同一个功能的代码, 那么如果是我写的话, 我会把同一个功能的代码的初始化放在一个地方.

    所以, 来看看初始化ILoadBalancer的类RibbonClientConfiguration有没有, 搜索一下IRule ,果然找到了

        @Bean
        @ConditionalOnMissingBean
        public IRule ribbonRule(IClientConfig config) {
            if (this.propertiesFactory.isSet(IRule.class, name)) {
                return this.propertiesFactory.get(IRule.class, config, name);
            }
            ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
            rule.initWithNiwsConfig(config);
            return rule;
        }

    棕色部分的代码是先从application.yml配置文件中读取用户指定的IRule实现类. 如果没有, 就是用默认的ZoneAvoidanceRule实现类.

    Ribbon的三大组件
    1. Rule
    2. Ping
    3. LoadBalancer
    
    这是ribbon三个最重要的组件, 他们三个都是在RibbonClientConfiguration被初始化的. 
       @Bean
        @ConditionalOnMissingBean
        public IRule ribbonRule(IClientConfig config) {
            if (this.propertiesFactory.isSet(IRule.class, name)) {
                return this.propertiesFactory.get(IRule.class, config, name);
            }
            ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
            rule.initWithNiwsConfig(config);
            return rule;
        }
    
        @Bean
        @ConditionalOnMissingBean
        public IPing ribbonPing(IClientConfig config) {
            if (this.propertiesFactory.isSet(IPing.class, name)) {
                return this.propertiesFactory.get(IPing.class, config, name);
            }
            return new DummyPing();
        }
    
       @Bean
        @ConditionalOnMissingBean
        public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
            if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
                return this.propertiesFactory.get(ILoadBalancer.class, config, name);
            }
            return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                    serverListFilter, serverListUpdater);
        }

     下面看看ZoneAvoidanceRule的实现, 为什么看看实现呢, 要了解他的父类继承关系, 因为很可能在调用的时候, 不是调用的它本身, 而是调用的父类

     

     

     我们记住zoneAvoidanceRule的一个父类是PredicateBaseRule.

    下面我们在回到this.rule.choose(key)这个方法上来, 这回我们就知道这里调用的choose()是哪一个类下面的了, 他是PredicateBaseRule下面的.

      @Override
        public Server choose(Object key) {
            ILoadBalancer lb = getLoadBalancer();
            Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
            if (server.isPresent()) {
                return server.get();
            } else {
                return null;
            }       
        }

    很显然这里采用的是轮训策略选择服务器. 具体的轮训策略是如何选择的呢? 具体来看看chooseRoundRobinAfterFiltering()方法

    /**
         * Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key. 
         */
        public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
         // 获取全部可用的服务 List
    <Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); }
         // 采用的服务策略.
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); }
      private int incrementAndGetModulo(int modulo) {
            for (;;) {
                int current = nextIndex.get();
                int next = (current + 1) % modulo;
                if (nextIndex.compareAndSet(current, next) && current < modulo)
                    return current;
            }
        }

    有效的服务数+1取模, 为了防止并发, 这里使用了CAS的思想, 比较并赋值. 如果赋值失败, 会再次进入for循环, 知道成功为止

     在选择轮训策略的时候, chooseRoundRobinAfterFiltering(lb.getAllServers(), key); 我们传进来了一个参数 ,lb.getAllServers(), 获取负载均衡中的所有服务.这都有哪些服务呢?往前推理, 应该是有某个地方传入了这个参数, 或者通过某个参数计算得到了服务列表. ---> 这时我们也不知道在哪里, 那就看看构造方法吧, 看谁的构造方法呢, 负载均衡器的构造方法

    前面已经知道负载均衡器使用的是ZoneAwareLoadBalancer, 调用了父类的构造方法

    public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                     IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                     ServerListUpdater serverListUpdater) {
            super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
        }

    下面来看父类的构造方法

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                             ServerList<T> serverList, ServerListFilter<T> filter,
                                             ServerListUpdater serverListUpdater) {
            super(clientConfig, rule, ping);
            this.serverListImpl = serverList;
            this.filter = filter;
            this.serverListUpdater = serverListUpdater;
            if (filter instanceof AbstractServerListFilter) {
                ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
            }
            restOfInit(clientConfig);
        }

    一般情况下, 看到init方法和start方法, 都要进去看看, 这里通常都是重点, 里面有内容

         void restOfInit(IClientConfig clientConfig) {
            boolean primeConnection = this.isEnablePrimingConnections();
            // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
            this.setEnablePrimingConnections(false);
            enableAndInitLearnNewServersFeature();
    
            updateListOfServers();
            if (primeConnection && this.getPrimeConnections() != null) {
                this.getPrimeConnections()
                        .primeConnections(getReachableServers());
            }
            this.setEnablePrimingConnections(primeConnection);
            LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
        }

    启动, 初始化服务, 而且还要不断你的学习. 这里的学习是什么意思呢? 其实就是不停的问问nacos, 服务列表有更新么?有更新,我就去拉取过来, 更新本地的服务. 

    具体的enableAndInitLearnNewServersFeature()方法是什么呢

      public void enableAndInitLearnNewServersFeature() {
            LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
            serverListUpdater.start(updateAction);
        }

    updateAction是要更新的动作, 也就是最终更新操作是在updateAction中执行的, 来看看为什么这么说呢

      protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
            @Override
            public void doUpdate() {
                updateListOfServers();
            }
        };

    这里updateAction这个变量是UpdateAction的一个实例对象, 这个对象有一个方法doUpdate(), 而doUpdate()方法中调用了updateListOfServers()方法.

      @VisibleForTesting
        public void updateListOfServers() {
            List<T> servers = new ArrayList<T>();
            if (serverListImpl != null) {
                servers = serverListImpl.getUpdatedListOfServers();
                LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
    
                if (filter != null) {
                    servers = filter.getFilteredListOfServers(servers);
                    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                            getIdentifier(), servers);
                }
            }
            updateAllServerList(servers);
        }

    更新服务. 这是具体的更新服务的方法

    这个更新操作是什么时候被执行的呢?serverListUpdater.start(updateAction); 在这里, 一看见start, 知道是个类似于定时任务的方法, 很重要. 进去看看

    @Override
        public synchronized void start(final UpdateAction updateAction) {
            if (isActive.compareAndSet(false, true)) {
                final Runnable wrapperRunnable = new Runnable() {
                    @Override
                    public void run() {
                        if (!isActive.get()) {
                            if (scheduledFuture != null) {
                                scheduledFuture.cancel(true);
                            }
                            return;
                        }
                        try {
                            updateAction.doUpdate();
                            lastUpdated = System.currentTimeMillis();
                        } catch (Exception e) {
                            logger.warn("Failed one update cycle", e);
                        }
                    }
                };
    
                scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                        wrapperRunnable,
                        initialDelayMs,
                        refreshIntervalMs,
                        TimeUnit.MILLISECONDS
                );
            } else {
                logger.info("Already active, no-op");
            }
        }

     updateAction.doUpdate();就是上面执行的方法 , 这个方法的返回值是一个Runnable线程, 然后继续往下看, 将runnable线程传给了一个定时任务. 定时执行更新操作.

     

     上面的操作其实就是上图, ribbon和nacos同步服务集群数据的过程. 我们知道, 在nacos中有一个注册表用来存储服务注册过来的信息, 项目启动后, 这些元数据信息会同步回传给ribbon, ribbon会在本地维护一个注册表. 但这个注册表可能随时变化, 所以, 需要定期去同步更新服务数据. 所以.

    1. 初始化的时候去nacos去服务注册表数据

    2. 定时任务同步获取注册表数据

    @VisibleForTesting
        public void updateListOfServers() {
            List<T> servers = new ArrayList<T>();
            if (serverListImpl != null) {
                servers = serverListImpl.getUpdatedListOfServers();
                LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
    
                if (filter != null) {
                    servers = filter.getFilteredListOfServers(servers);
                    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                            getIdentifier(), servers);
                }
            }
            updateAllServerList(servers);
        }

    更新服务的三个重要的方法

    1.  getUpdatedListOfServers : 查询nacos中的服务实例, 在查询之前更新, 然后返回最新的服务列表

    2.  getFilteredListOfServers: 过滤服务实例, 将满足条件的服务实例过滤, 不满足条件的去掉

    3. updateAllServerList: 更新服务实例

    as

  • 相关阅读:
    图论一角
    入门-k8s部署应用 (三)
    入门-k8s集群环境搭建(二)
    入门-Kubernetes概述 (一)
    shell中获取时间
    Linux shell脚本之 if条件判断 (转)
    linux中shell变量$#等的释义
    shell 的here document 用法 (cat << EOF) (转)
    Homebrew的安装与使用
    docker容器编排 (4)
  • 原文地址:https://www.cnblogs.com/ITPower/p/13424244.html
Copyright © 2011-2022 走看看