zoukankan      html  css  js  c++  java
  • spring cloud ribbon


    负载策略

    RoundRobinRule,轮训策略,默认策略
    RandomRule,随机,使用Random对象从服务列表中随机选择一个服务
    RetryRule,轮询 + 重试
    WeightedResponseTimeRule:优先选择响应时间快,此策略会根据平均响应时间计算所有服务的权重,响应时间越快,服务权重越重、被选中的概率越高。此类有个DynamicServerWeightTask的定时任务,默认情况下每隔30秒会计算一次各个服务实例的权重。刚启动时,如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,会切换回来
    AvailabilityFilteringRule:可用性过滤,会先过滤掉以下服务:由于多次访问故障而断路器处于打开的服务、并发的连接数量超过阈值,然后对剩余的服务列表按照RoundRobinRule策略进行访问
    BestAvailableRule:优先选择并发请求最小的,刚启动时吗,如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,才会切换回来
    ZoneAvoidanceRule:可以实现避免可能访问失效的区域(zone)

    @LoadBalanced开启了负载功能

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
    
    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Qualifier
    public @interface LoadBalanced {
    }
    
    1. 在RestTemplate上面增加注解@LoadBalanced,就开启了负载功能。
    2. 该注解会把所有的的RestTemplate实例都加上@LoadBalanced,后续通过@Qualifier和@Autowire注解可以将所有的RestTemplate都注入获取到。
      然后挨个给这些RestTemplate添加拦截器,在拦截器中实现负载逻辑。

    RibbonAutoConfiguration自动配置类

    @Configuration
    @ConditionalOnClass({IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})
    @RibbonClients
    @AutoConfigureAfter(
            name = {"org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration"}
    )
    //引入LoadBalancerAutoConfiguration类
    @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
    @EnableConfigurationProperties({RibbonEagerLoadProperties.class})
    public class RibbonAutoConfiguration {
        @Autowired(
                required = false
        )
        private List<RibbonClientSpecification> configurations = new ArrayList();
    
        //饥饿加载模式配置
        //正常模式下ribbon client都是到使用的时候才去加载
        //饥饿模式在spring容器初始化完毕就加载ribbon client
        @Autowired
        private RibbonEagerLoadProperties ribbonEagerLoadProperties;
    
        //子容器工厂,一个项目可以配置多个ribbon client,这些client分别加载自己的容器,互相独立
        @Bean
        public SpringClientFactory springClientFactory() {
            SpringClientFactory factory = new SpringClientFactory();
            factory.setConfigurations(this.configurations);
            return factory;
        }
    
        @Bean//引入RibbonLoadBalancerClient
        @ConditionalOnMissingBean({LoadBalancerClient.class})
        public LoadBalancerClient loadBalancerClient() {
            return new RibbonLoadBalancerClient(this.springClientFactory());
        }
    
    
        @Bean
        @ConditionalOnProperty(
                value = {"ribbon.eager-load.enabled"},//该属性开启饥饿模式
                matchIfMissing = false
        )
        //饥饿加载模式
        public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
            return new RibbonApplicationContextInitializer(this.springClientFactory(), this.ribbonEagerLoadProperties.getClients());
        }
    
    }
    
    1. LoadBalancerAutoConfiguration:给所有RestTemplate添加拦截器
    2. SpringClientFactory子容器

    SpringClientFactory子容器

    在多ribbon客户端的情况下,SpringClientFactory会为每个客户端都加载自己的上下文,实现ribbon客户端的隔离性。

    举个例子:

    定义两个自定义ribbon客户端

    @Configurable
    public class RibbonCust1 {
        @Bean
        public IRule myRule1(){
            return new RandomRule();
        }
    }
    
    @Configurable
    public class RibbonCust2 {
        @Bean
        public IRule myRule2(){
            return new RetryRule();
        }
    }
    

    启动类开启ribbon负载,并且指明哪些服务用哪些ribbon

    @SpringBootApplication
    @RibbonClient(
            name = "demo-goods", configuration = MyRule2.class
    )
    public class Ads2Application {
        public static void main(String[] args) {
            SpringApplication.run(Ads2Application.class,args);
        }
    }
    

    这种情况不同的客户端的上下文环境是独立的,SpringClientFactory就是存储子上下文的。

    返回顶部

    LoadBalancerAutoConfiguration:给所有RestTemplate添加拦截器

     @Configuration
    @ConditionalOnClass({RestTemplate.class})
    @ConditionalOnBean({LoadBalancerClient.class})
    @EnableConfigurationProperties({LoadBalancerRetryProperties.class})
    public class LoadBalancerAutoConfiguration {
        //这里配合@LoadBalanced注解的@Qualifier注解,将所有包含@LoadBalanced注解的RestTemplate给注入进来
        @LoadBalanced
        @Autowired(
                required = false
        )
        private List<RestTemplate> restTemplates = Collections.emptyList();
        @Autowired(
                required = false
        )
        private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
    
        public LoadBalancerAutoConfiguration() {
        }
    
        /***
         * 此方法的逻辑
         * 获取到所有的RestTemplate,把这些RestTemplate加入到自定义定制器中
         * @param customizers
         * @return
         */
        @Bean
        public SmartInitializingSingleton loadBalancedRestTemplateInitializer(final List<RestTemplateCustomizer> customizers) {
            return new SmartInitializingSingleton() {
                public void afterSingletonsInstantiated() {
                    Iterator var1 = org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.this.restTemplates.iterator();
    
                    while(var1.hasNext()) {
                        RestTemplate restTemplate = (RestTemplate)var1.next();
                        Iterator var3 = customizers.iterator();
    
                        while(var3.hasNext()) {
                            RestTemplateCustomizer customizer = (RestTemplateCustomizer)var3.next();
                            customizer.customize(restTemplate);
                        }
                    }
    
                }
            };
        }
    
        @Bean
        @ConditionalOnMissingBean
        public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
            return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
        }
    
    
        @Configuration
        @ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
        static class LoadBalancerInterceptorConfig {
            LoadBalancerInterceptorConfig() {
            }
    
            //初始化拦截器
            @Bean
            public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
                return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
            }
    
            //初始化自定义定制器:该定制器会将放入到restTemplate都加上拦截器
            @Bean
            @ConditionalOnMissingBean
            public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
                return new RestTemplateCustomizer() {
                    public void customize(RestTemplate restTemplate) {
                        List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
                        list.add(loadBalancerInterceptor);
                        restTemplate.setInterceptors(list);
                    }
                };
            }
        }
    }
    
    //LoadBalancerInterceptor
        private LoadBalancerClient loadBalancer;
    
        public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
            URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            //入口
            return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
        }
    
    //RibbonLoadBalancerClient
         public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
            //获取负载均衡器
            ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
            //通过负载均衡器选主一个server
            Server server = this.getServer(loadBalancer);//入口1
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            } else {
                RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
                return this.execute(serviceId, ribbonServer, request);//入口2
            }
        }
    
        protected Server getServer(ILoadBalancer loadBalancer) {
            return loadBalancer == null ? null : loadBalancer.chooseServer("default");
        }
    
    1. 获取所有restTempalte,遍历挨个添加拦截器。
    2. 后续通过restTempalte发起远程调用的时候,都会被拦截器拦截
      • 通过loadBalancer选择一个server
      • 发起远程调用

    loadBalancer

    先看下loadBalancer什么样

    public class BaseLoadBalancer extends AbstractLoadBalancer implements
            PrimeConnections.PrimeConnectionListener, IClientConfigAware {
        
        
        private final static IRule DEFAULT_RULE = new RoundRobinRule();
        
        private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
        
        protected IRule rule = DEFAULT_RULE;
    
        protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
    
        protected IPing ping = null;
    
        @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
        protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
        
        @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
        protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
        
        private IClientConfig config; 
        
    }
    
    public BaseLoadBalancer() {
        this.name = DEFAULT_NAME;
        this.ping = null;
        setRule(DEFAULT_RULE);
        setupPingTask();//入口
        lbStats = new LoadBalancerStats(DEFAULT_NAME);
    }
    
    
    void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                                           true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);//入口
        forceQuickPing();
    }
    
    
    //    protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
    
    class PingTask extends TimerTask {
        public void run() {
            try {
                new Pinger(pingStrategy).runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", name, e);
            }
        }
    }
    
    //默认Ping 的方式
    
    private static class SerialPingStrategy implements IPingStrategy {
    
        @Override
        public boolean[] pingServers(IPing ping, Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];
    
            logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);
    
            for (int i = 0; i < numCandidates; i++) {
                results[i] = false; /* Default answer is DEAD. */
                try {
                    if (ping != null) {
                        results[i] = ping.isAlive(servers[i]);
                    }
                } catch (Exception e) {
                    logger.error("Exception while pinging Server: '{}'", servers[i], e);
                }
            }
            return results;
        }
    }
    
    
    
    class Pinger {
    
            private final IPingStrategy pingerStrategy;
    
            public Pinger(IPingStrategy pingerStrategy) {
                this.pingerStrategy = pingerStrategy;
            }
    
            public void runPinger() throws Exception {
                if (!pingInProgress.compareAndSet(false, true)) { 
                    return; // Ping in progress - nothing to do
                }
                
                // we are "in" - we get to Ping
    
                Server[] allServers = null;
                boolean[] results = null;
    
                Lock allLock = null;
                Lock upLock = null;
    
                try {
                    /*
                     * The readLock should be free unless an addServer operation is
                     * going on...
                     */
                    allLock = allServerLock.readLock();
                    allLock.lock();
                    allServers = allServerList.toArray(new Server[allServerList.size()]);
                    allLock.unlock();
    
                    int numCandidates = allServers.length;
                    results = pingerStrategy.pingServers(ping, allServers);
    
                    final List<Server> newUpList = new ArrayList<Server>();
                    final List<Server> changedServers = new ArrayList<Server>();
    
                    for (int i = 0; i < numCandidates; i++) {
                        boolean isAlive = results[i];
                        Server svr = allServers[i];
                        boolean oldIsAlive = svr.isAlive();
    
                        svr.setAlive(isAlive);
    					//如果当前检测状态和之前的状态不一致,稍后会用于发送状态变化通知
                        if (oldIsAlive != isAlive) {
                            changedServers.add(svr);
                            logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                        		name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                        }
    					//如果是存活状态,添加进list
                        if (isAlive) {
                            newUpList.add(svr);
                        }
                    }
                    upLock = upServerLock.writeLock();
                    upLock.lock();
                    upServerList = newUpList;
                    upLock.unlock();
    				//发送状态变化通知消息
                    notifyServerStatusChangeListener(changedServers);
                } finally {
                    pingInProgress.set(false);
                }
            }
        }
    
    1. 默认的负载均衡策略是轮询;
    2. 内部保存了两个list,一个用来保存所有server,一个用来保存存活的server
    3. 通过一个定时任务用来维护上面两个list,10S 执行一次
      • 拿到当前 allServerList 中 所有的节点去Ping,在allServerList 中标记其存活状态,用当前Ping的存活状态 和 上次的 准或状态比对,如果 状态不同 ,发出通知 这些节点状态有改变;如果状态为存活,那么将 upServerList 中的节点用本次检测出的所有存活节点替换。

    接下来看下loadBalancer选择server的方法

    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);//入口
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }
    
    1. 这个方法即通过具体的负载策略来选择服务器

    分析完 BaseLoadBalancer ,DynamicServerListLoadBalancer 和 ZoneAwareLoadBalancer 基本大同小异:
    DynamicServerListLoadBalancer :使用动态源的服务器, 即服务器列表可能是在运行时更改。 通过一些Filter函数来动态的过滤掉指定的服务器列表;
    ZoneAwareLoadBalancer :这个负载均衡器适用于异地多机房的情况,在选择服务器的时候可以避免整个区域。

    返回顶部

  • 相关阅读:
    web前端攻城狮都来晒一晒你的收藏夹吧
    淘宝前端技术系列课程分享
    HTML5编程实战之二:用动画的形式切换图片
    HTML5编程实战之一:HTML5时钟
    【转】chrome developer tool 调试技巧
    Android 云端推送C2DM php实现向终端推送消息
    简单的泰国IP判断
    [翻译]延迟着色(Shawn Hargreaves)〔1〕
    [翻译]延迟着色(2)
    [3D基础]投影矩阵的推导(1)
  • 原文地址:https://www.cnblogs.com/yanhui007/p/12640442.html
Copyright © 2011-2022 走看看