zoukankan      html  css  js  c++  java
  • SpringCloud--Ribbon--源码解析--Ribbon入口实现

      Ribbon总体的源码结构,如下图所示:

       上图是Ribbon源码的总览图,每一个颜色,代表源码中的一部分内容,总体来看,Ribbon源码的实现,总共分为五个部分,从上而下依次为,Ribbon入口实现、IloadBalancer实现、ServerListUpdater实现、ServerListFilter实现和IRule实现。

      针对这些实现,粗略描述如下:

    实现内容 描述
    Ribbon入口实现 自动配置加载及请求拦截器加载
    IloadBalancer实现  负载均衡器
    ServerListUpdater实现  动态更新本地存储的实例清单
    ServerListFilter实现  过滤器
    IRule实现  负载均衡策略

      本篇主要说Ribbon入口实现

      在SpringCloud--Eureka--搭建中,对于使用Ribbon的项目,主要是在主函数上增加了如下代码

        @Bean
        @LoadBalanced
        RestTemplate restTemplate(){
            return new RestTemplate();
        }

      然后在后续的调用中,直接使用restTemplate进行调用,那么我们就可以先看一下@LoadBalanced注解,从@LoadBalanced注解源码可以看到,该注解用来给RestTemplate做标记,以使用负载均衡客户端LoadBalancerClient

      该客户端接口提供了三个方法:

    方法 描述 源码
    继承父类的choose方法 根据传入的服务名,从负载均衡器中挑选一个对应的服务实例 ServiceInstance choose(String serviceId);
    execute方法 使用从负载均衡器中选择的服务实例进行请求

    <T> T execute(String serviceId, LoadBalancerRequest<T> request)

    <T> T execute(String serviceId, ServiceInstance serviceInstance,LoadBalancerRequest<T> request)

    reconstructURI 为系统构建一个合适的host:port形式的URI,其中请求的ServiceInstance一般是 服务名:port,返回的URI则是ServiceInstance服务实例详情拼接的具体host:port形式的请求地址 URI reconstructURI(ServiceInstance instance, URI original);

      根据LoadBalancerClient查看,相关类的关系图如下所示:

       在上图中,通过类的命名可以看出LoadBalancerAutoConfiguration为客户端负载均衡器的自动化配置类

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(RestTemplate.class)
    @ConditionalOnBean(LoadBalancerClient.class)
    @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
    public class LoadBalancerAutoConfiguration {
    
        @LoadBalanced
        @Autowired(required = false)
        private List<RestTemplate> restTemplates = Collections.emptyList();
    
        @Autowired(required = false)
        private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
    
        @Bean
        public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
                final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
            return () -> restTemplateCustomizers.ifAvailable(customizers -> {
                for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                    for (RestTemplateCustomizer customizer : customizers) {
                        customizer.customize(restTemplate);
                    }
                }
            });
        }
    
        @Bean
        @ConditionalOnMissingBean
        public LoadBalancerRequestFactory loadBalancerRequestFactory(
                LoadBalancerClient loadBalancerClient) {
            return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
        }
    
        @Configuration(proxyBeanMethods = false)
        @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
        static class LoadBalancerInterceptorConfig {
    
            @Bean
            public LoadBalancerInterceptor ribbonInterceptor(
                    LoadBalancerClient loadBalancerClient,
                    LoadBalancerRequestFactory requestFactory) {
                return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
            }
    
            @Bean
            @ConditionalOnMissingBean
            public RestTemplateCustomizer restTemplateCustomizer(
                    final LoadBalancerInterceptor loadBalancerInterceptor) {
                return restTemplate -> {
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                };
            }
    
        }
    
        /**
         * Auto configuration for retry mechanism.
         */
        @Configuration(proxyBeanMethods = false)
        @ConditionalOnClass(RetryTemplate.class)
        public static class RetryAutoConfiguration {
    
            @Bean
            @ConditionalOnMissingBean
            public LoadBalancedRetryFactory loadBalancedRetryFactory() {
                return new LoadBalancedRetryFactory() {
                };
            }
    
        }
    
        /**
         * Auto configuration for retry intercepting mechanism.
         */
        @Configuration(proxyBeanMethods = false)
        @ConditionalOnClass(RetryTemplate.class)
        public static class RetryInterceptorAutoConfiguration {
    
            @Bean
            @ConditionalOnMissingBean
            public RetryLoadBalancerInterceptor ribbonInterceptor(
                    LoadBalancerClient loadBalancerClient,
                    LoadBalancerRetryProperties properties,
                    LoadBalancerRequestFactory requestFactory,
                    LoadBalancedRetryFactory loadBalancedRetryFactory) {
                return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
                        requestFactory, loadBalancedRetryFactory);
            }
    
            @Bean
            @ConditionalOnMissingBean
            public RestTemplateCustomizer restTemplateCustomizer(
                    final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
                return restTemplate -> {
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                };
            }
    
        }
    
    }

    从源码类的注解上可以知道Ribbon负载均衡必须满足如下两个条件:

      1、@ConditionalOnClass(RestTemplate.class):RestTemplate类必须存在该项目中

      2、@ConditionalOnBean(LoadBalancerClient.class):必须要有LoadBalancerClient的先险类Bean

    同时,该自动化配置类主要做了一下内容:

      1、创建了一个LoadBalancerInterceptor的Bean,用以实现对客户端请求的拦截,以实现客户端负载均衡

      2、创建了一个RestTemplateCustomizer的Bean,用以给restTemplate增加LoadBalancerInterceptor拦截器

      3、维护了一个@LoadBalanced注解修饰的RestTemplate集合,并且进行初始化,通过调用RestTemplateCustomizer的实例来给客户端的RestTemplate添加LoadBalancerInterceptor拦截器

    那么接下来看下LoadBalancerInterceptor是如何将RestTemplate进行负载均衡的

    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    
        private LoadBalancerClient loadBalancer;
    
        private LoadBalancerRequestFactory requestFactory;
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
                LoadBalancerRequestFactory requestFactory) {
            this.loadBalancer = loadBalancer;
            this.requestFactory = requestFactory;
        }
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
            // for backwards compatibility
            this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
        }
    
        @Override
        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                final ClientHttpRequestExecution execution) throws IOException {
            final URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            Assert.state(serviceName != null,
                    "Request URI does not contain a valid hostname: " + originalUri);
            return this.loadBalancer.execute(serviceName,
                    this.requestFactory.createRequest(request, body, execution));
        }
    
    }

      通过源码可以看到,LoadBalancerInterceptor的构造函数传入了LoadBalancerClient,因此当一个被@LoadBalanced注解修饰的RestTemplate对外发起http请求时,就会被LoadBalancerInterceptor拦截,并调用LoadBalancerClient中的execute方法。

      RibbonLoadBalancerClient是LoadBalancerClient的实现类,其中的execute方法如下所示:

        public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
                throws IOException {
            ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
            Server server = getServer(loadBalancer, hint);
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
            RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                    isSecure(server, serviceId),
                    serverIntrospector(serviceId).getMetadata(server));
    
            return execute(serviceId, ribbonServer, request);
        }

      由以上代码可见,先是使用ILoadBalancer获取了一个Server对象,然后将Server对象封装成RibbonServer对象(该对象除了存储服务实例的信息之外,还增加了服务名ServiceId、是否需要使用HTTPS等其他信息),最后使用该对象在回调LoadBalancerInterceptor请求拦截器中LoadBalancerRequest的apply方法,从而向一个具体的实例发起请求,从而实现一开始以服务名为host的URI请求到host:port形式实际访问地址的转换。

        public <T> T execute(String serviceId, ServiceInstance serviceInstance,
                LoadBalancerRequest<T> request) throws IOException {
            Server server = null;
            if (serviceInstance instanceof RibbonServer) {
                server = ((RibbonServer) serviceInstance).getServer();
            }
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
    
            RibbonLoadBalancerContext context = this.clientFactory
                    .getLoadBalancerContext(serviceId);
            RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
            try {
                T returnVal = request.apply(serviceInstance);
                statsRecorder.recordStats(returnVal);
                return returnVal;
            }
            // catch IOException and rethrow so RestTemplate behaves correctly
            catch (IOException ex) {
                statsRecorder.recordStats(ex);
                throw ex;
            }
            catch (Exception ex) {
                statsRecorder.recordStats(ex);
                ReflectionUtils.rethrowRuntimeException(ex);
            }
            return null;
        }

      由以上代码可见,最终会回调拦截器org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerInterceptor#intercept,在拦截器中,调用了org.springframework.http.client.InterceptingAsyncClientHttpRequest.AsyncRequestExecution#executeAsync方法,该方法传入了一个参数ServiceRequestWrapper

            public ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body)
                    throws IOException {
    
                if (this.iterator.hasNext()) {
                    AsyncClientHttpRequestInterceptor interceptor = this.iterator.next();
                    return interceptor.intercept(request, body, this);
                }
                else {
                    URI uri = request.getURI();
                    HttpMethod method = request.getMethod();
                    HttpHeaders headers = request.getHeaders();
    
                    Assert.state(method != null, "No standard HTTP method");
                    AsyncClientHttpRequest delegate = requestFactory.createAsyncRequest(uri, method);
                    delegate.getHeaders().putAll(headers);
                    if (body.length > 0) {
                        StreamUtils.copy(body, delegate.getBody());
                    }
    
                    return delegate.executeAsync();
                }
            }
        }

    可以看到,该方法中,首先获取要请求的URI,因为入参的HttpRequest为ServiceRequestWrapper,因此使用的是ServiceRequestWrapper中的个月URI,然后再调用requestFactory.createAsyncRequest(uri, method)来创建一个http请求。

        @Override
        public URI getURI() {
            URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
            return uri;
        }

    顺着该方法向下看,最终访问到org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#reconstructURI方法

        @Override
        public URI reconstructURI(ServiceInstance instance, URI original) {
            Assert.notNull(instance, "instance can not be null");
            String serviceId = instance.getServiceId();
            RibbonLoadBalancerContext context = this.clientFactory
                    .getLoadBalancerContext(serviceId);
    
            URI uri;
            Server server;
            if (instance instanceof RibbonServer) {
                RibbonServer ribbonServer = (RibbonServer) instance;
                server = ribbonServer.getServer();
                uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
            }
            else {
                server = new Server(instance.getScheme(), instance.getHost(),
                        instance.getPort());
                IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
                ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
                uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                        serverIntrospector, server);
            }
            return context.reconstructURIWithServer(server, uri);
        }

      从上述代码可以看到,首先从ServiceInstance对象中获取了serviceId,然后根据serviceId从SpringClientFactory对象中获取RibbonLoadBalancerContext对象,最后,调用RibbonLoadBalancerContext对象的reconstructURIWithServer方法,获取具体的URI地址。

      在上述代码中,SpringClientFactory类是用来创建客户端负载均衡的工厂类,该工厂类会为每一个不同名的Ribbon客户端生成不同的Spring上下文。

      RibbonLoadBalancerContext类是LoadBalancerContext类的子类,该类用于存储一些被负载均衡器使用的上下文内容和API操作。

        public URI reconstructURIWithServer(Server server, URI original) {
            String host = server.getHost();
            int port = server.getPort();
            String scheme = server.getScheme();
            
            if (host.equals(original.getHost()) 
                    && port == original.getPort()
                    && scheme == original.getScheme()) {
                return original;
            }
            if (scheme == null) {
                scheme = original.getScheme();
            }
            if (scheme == null) {
                scheme = deriveSchemeAndPortFromPartialUri(original).first();
            }
    
            try {
                StringBuilder sb = new StringBuilder();
                sb.append(scheme).append("://");
                if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
                    sb.append(original.getRawUserInfo()).append("@");
                }
                sb.append(host);
                if (port >= 0) {
                    sb.append(":").append(port);
                }
                sb.append(original.getRawPath());
                if (!Strings.isNullOrEmpty(original.getRawQuery())) {
                    sb.append("?").append(original.getRawQuery());
                }
                if (!Strings.isNullOrEmpty(original.getRawFragment())) {
                    sb.append("#").append(original.getRawFragment());
                }
                URI newURI = new URI(sb.toString());
                return newURI;            
            } catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }

      通过上述代码可以看到,最终是使用host和port最终拼接成了最终的请求地址。

  • 相关阅读:
    今天看了几个小时的微信小程序说说心得体会
    关于wordpress中的contact form7和WP Mail SMTP的一些设置
    关于163发邮件报错535 Error:authentication failed解决方法
    Numpy 基本除法运算和模运算
    基本的图像操作和处理
    Python中flatten用法
    media
    TensorFlow模型保存和提取方法
    docker 默认用户和密码
    Windows安装TensorFlow
  • 原文地址:https://www.cnblogs.com/liconglong/p/13265035.html
Copyright © 2011-2022 走看看