zoukankan      html  css  js  c++  java
  • 撸一撸Spring Cloud Ribbon的原理

    说起负载均衡一般都会想到服务端的负载均衡,常用产品包括LBS硬件或云服务、Nginx等,都是耳熟能详的产品。

    而Spring Cloud提供了让服务调用端具备负载均衡能力的Ribbon,通过和Eureka的紧密结合,不用在服务集群内再架设负载均衡服务,很大程度简化了服务集群内的架构。

    具体也不想多写虚的介绍,反正哪里都能看得到相关的介绍。

    直接开撸代码,通过代码来看Ribbon是如何实现的。

    配置

    详解:

    1.RibbonAutoConfiguration配置生成RibbonLoadBalancerClient实例。

    代码位置:

    spring-cloud-netflix-core-1.3.5.RELEASE.jar
    org.springframework.cloud.netflix.ribbon
    RibbonAutoConfiguration.class

    @Configuration
    @ConditionalOnClass({ IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})
    @RibbonClients
    @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
    @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
    @EnableConfigurationProperties(RibbonEagerLoadProperties.class)
    public class RibbonAutoConfiguration {
    
        //
    
        @Bean
        @ConditionalOnMissingBean(LoadBalancerClient.class)
        public LoadBalancerClient loadBalancerClient() {
            return new RibbonLoadBalancerClient(springClientFactory());
        }
    
            //
    }

    先看配置条件项,RibbonAutoConfiguration配置必须在LoadBalancerAutoConfiguration配置前执行,因为在LoadBalancerAutoConfiguration配置中会使用RibbonLoadBalancerClient实例。

    RibbonLoadBalancerClient继承自LoadBalancerClient接口,是负载均衡客户端,也是负载均衡策略的调用方。

    2.LoadBalancerInterceptorConfig配置生成:
    1).负载均衡拦截器LoadBalancerInterceptor实例
    包含:
      LoadBalancerClient实现类的RibbonLoadBalancerClient实例
      负载均衡的请求创建工厂LoadBalancerRequestFactory:实例
    2).RestTemplate自定义的RestTemplateCustomizer实例

    代码位置:

    spring-cloud-commons-1.2.4.RELEASE.jar
    org.springframework.cloud.client.loadbalancer
    LoadBalancerAutoConfiguration.class

    @Configuration
    @ConditionalOnClass(RestTemplate.class)
    @ConditionalOnBean(LoadBalancerClient.class)
    @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
    public class LoadBalancerAutoConfiguration {
    
        //
    
        @Bean
        @ConditionalOnMissingBean
        public LoadBalancerRequestFactory loadBalancerRequestFactory(
                LoadBalancerClient loadBalancerClient) {
            return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
        }
    
        @Configuration
        @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
        static class LoadBalancerInterceptorConfig {
            @Bean
            public LoadBalancerInterceptor ribbonInterceptor(
                    LoadBalancerClient loadBalancerClient,
                    LoadBalancerRequestFactory requestFactory) {
                return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
            }
    
            @Bean
            @ConditionalOnMissingBean
            public RestTemplateCustomizer restTemplateCustomizer(
                    final LoadBalancerInterceptor loadBalancerInterceptor) {
                return new RestTemplateCustomizer() {
                    @Override
                    public void customize(RestTemplate restTemplate) {
                        List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                                restTemplate.getInterceptors());
                        list.add(loadBalancerInterceptor);
                        restTemplate.setInterceptors(list);
                    }
                };
            }
        }
    
        //
    
    }

    先看配置条件项:

    要求在项目环境中必须要有RestTemplate类。

    要求必须要有LoadBalancerClient接口的实现类的实例,也就是上一步生成的RibbonLoadBalancerClient。

    3.通过上面一步创建的RestTemplateCustomizer配置所有RestTemplate实例,就是将负载均衡拦截器设置给RestTemplate实例。

    @Configuration
    @ConditionalOnClass(RestTemplate.class)
    @ConditionalOnBean(LoadBalancerClient.class)
    @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
    public class LoadBalancerAutoConfiguration {
    
        //
    
        @Bean
        public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
                final List<RestTemplateCustomizer> customizers) {
            return new SmartInitializingSingleton() {
                @Override
                public void afterSingletonsInstantiated() {
                    for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                        for (RestTemplateCustomizer customizer : customizers) {
                            customizer.customize(restTemplate);
                        }
                    }
                }
            };
        }
    
        //
    
        @Configuration
        @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
        static class LoadBalancerInterceptorConfig {
            @Bean
            public LoadBalancerInterceptor ribbonInterceptor(
                    LoadBalancerClient loadBalancerClient,
                    LoadBalancerRequestFactory requestFactory) {
                return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
            }
    
            @Bean
            @ConditionalOnMissingBean
            public RestTemplateCustomizer restTemplateCustomizer(
                    final LoadBalancerInterceptor loadBalancerInterceptor) {
                return new RestTemplateCustomizer() {
                    @Override
                    public void customize(RestTemplate restTemplate) {
                        List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                                restTemplate.getInterceptors());
                        list.add(loadBalancerInterceptor);
                        restTemplate.setInterceptors(list);
                    }
                };
            }
        }
    
        //
    }

    restTemplate.setInterceptors(list)这个地方就是注入负载均衡拦截器的地方LoadBalancerInterceptor。

    从这个地方实际上也可以猜出来,RestTemplate可以通过注入的拦截器来构建相应的请求实现负载均衡。

    也能看出来可以自定义拦截器实现其他目的。

    4.RibbonClientConfiguration配置生成ZoneAwareLoadBalancer实例

    代码位置:

    spring-cloud-netflix-core-1.3.5.RELEASE.jar
    org.springframework.cloud.netflix.ribbon
    RibbonClientConfiguration.class

    @SuppressWarnings("deprecation")
    @Configuration
    @EnableConfigurationProperties
    //Order is important here, last should be the default, first should be optional
    // see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
    @Import({OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
    public class RibbonClientConfiguration {
    
        //
    
        @Bean
        @ConditionalOnMissingBean
        public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
            if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
                return this.propertiesFactory.get(ILoadBalancer.class, config, name);
            }
            return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                    serverListFilter, serverListUpdater);
        }
    
        //
    }

    ZoneAwareLoadBalancer继承自ILoadBalancer接口,该接口有一个方法:

        /**
         * Choose a server from load balancer.
         * 
         * @param key An object that the load balancer may use to determine which server to return. null if 
         *         the load balancer does not use this parameter.
         * @return server chosen
         */
        public Server chooseServer(Object key);

    ZoneAwareLoadBalancer就是一个具体的负载均衡实现类,也是默认的负载均衡类,通过对chooseServer方法的实现选取某个服务实例。

    拦截&请求

    1.使用RestTemplate进行Get、Post等各种请求,都是通过doExecute方法实现

    代码位置:
    spring-web-4.3.12.RELEASE.jar
    org.springframework.web.client
    RestTemplate.class

    public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
    
        //
    
        protected <T> T doExecute(URI url, HttpMethod method, RequestCallback requestCallback,
                ResponseExtractor<T> responseExtractor) throws RestClientException {
    
            Assert.notNull(url, "'url' must not be null");
            Assert.notNull(method, "'method' must not be null");
            ClientHttpResponse response = null;
            try {
                ClientHttpRequest request = createRequest(url, method);
                if (requestCallback != null) {
                    requestCallback.doWithRequest(request);
                }
                response = request.execute();
                handleResponse(url, method, response);
                if (responseExtractor != null) {
                    return responseExtractor.extractData(response);
                }
                else {
                    return null;
                }
            }
            catch (IOException ex) {
                String resource = url.toString();
                String query = url.getRawQuery();
                resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
                throw new ResourceAccessException("I/O error on " + method.name() +
                        " request for "" + resource + "": " + ex.getMessage(), ex);
            }
            finally {
                if (response != null) {
                    response.close();
                }
            }
        }
    
        //
    
    }

    支持的各种http请求方法最终都是调用doExecute方法,该方法内调用创建方法创建请求实例,并执行请求得到响应对象。

    2.生成请求实例创建工厂

    上一步代码中,调用createRequest方法创建请求实例,这个方法是定义在父类中。

    先整理出主要的继承关系:

     createRequest方法实际是定义在HttpAccessor抽象类中。

    public abstract class HttpAccessor {
    
        private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
    
        public void setRequestFactory(ClientHttpRequestFactory requestFactory) {
            Assert.notNull(requestFactory, "ClientHttpRequestFactory must not be null");
            this.requestFactory = requestFactory;
        }
    
        public ClientHttpRequestFactory getRequestFactory() {
            return this.requestFactory;
        }
    
        protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
            ClientHttpRequest request = getRequestFactory().createRequest(url, method);
            if (logger.isDebugEnabled()) {
                logger.debug("Created " + method.name() + " request for "" + url + """);
            }
            return request;
        }
    
    }

    在createRequest方法中调用getRequestFactory方法获得请求实例创建工厂,实际上getRequestFactory并不是当前HttpAccessor类中定义的,而是在子类InterceptingHttpAccessor中定义的。

    public abstract class InterceptingHttpAccessor extends HttpAccessor {
    
        private List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>();
    
        public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
            this.interceptors = interceptors;
        }
    
        public List<ClientHttpRequestInterceptor> getInterceptors() {
            return interceptors;
        }
    
        @Override
        public ClientHttpRequestFactory getRequestFactory() {
            ClientHttpRequestFactory delegate = super.getRequestFactory();
            if (!CollectionUtils.isEmpty(getInterceptors())) {
                return new InterceptingClientHttpRequestFactory(delegate, getInterceptors());
            }
            else {
                return delegate;
            }
        }
    
    }

    在这里做了个小动作,首先还是通过HttpAccessor类创建并获得SimpleClientHttpRequestFactory工厂,这个工厂主要就是在没有拦截器的时候创建基本请求实例。

    其次,在有拦截器注入的情况下,创建InterceptingClientHttpRequestFactory工厂,该工厂就是创建带拦截器的请求实例,因为注入了负载均衡拦截器,所以这里就从InterceptingClientHttpRequestFactory工厂创建。

    3.通过工厂创建请求实例

    创建实例就看工厂的createRequest方法。

    public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {
    
        private final List<ClientHttpRequestInterceptor> interceptors;
    
        public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
                List<ClientHttpRequestInterceptor> interceptors) {
    
            super(requestFactory);
            this.interceptors = (interceptors != null ? interceptors : Collections.<ClientHttpRequestInterceptor>emptyList());
        }
    
    
        @Override
        protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
            return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
        }
    
    }

    就是new了个InterceptingClientHttpRequest实例,并且把拦截器、基本请求实例创建工厂注进去。

    4.请求实例调用配置阶段注入的负载均衡拦截器的拦截方法intercept

    可从第1步看出,创建完请求实例后,通过执行请求实例的execute方法执行请求。

    ClientHttpRequest request = createRequest(url, method);
    if (requestCallback != null) {
        requestCallback.doWithRequest(request);
    }
    response = request.execute();

    实际请求实例是InterceptingClientHttpRequest,execute实际是在它的父类中。

    类定义位置:

    spring-web-4.3.12.RELEASE.jar
    org.springframework.http.client
    InterceptingClientHttpRequest.class

    看一下它们的继承关系。

     在execute方法中实际调用了子类实现的executeInternal方法。

    public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
    
        private final HttpHeaders headers = new HttpHeaders();
    
        private boolean executed = false;
    
        @Override
        public final HttpHeaders getHeaders() {
            return (this.executed ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
        }
    
        @Override
        public final OutputStream getBody() throws IOException {
            assertNotExecuted();
            return getBodyInternal(this.headers);
        }
    
        @Override
        public final ClientHttpResponse execute() throws IOException {
            assertNotExecuted();
            ClientHttpResponse result = executeInternal(this.headers);
            this.executed = true;
            return result;
        }
    
        protected void assertNotExecuted() {
            Assert.state(!this.executed, "ClientHttpRequest already executed");
        }
    
        protected abstract OutputStream getBodyInternal(HttpHeaders headers) throws IOException;
    
        protected abstract ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException;
    
    }

    其实就是InterceptingClientHttpRequest类的executeInternal方法,其中,又调用了一个执行器InterceptingRequestExecution的execute,通关判断如果有拦截器注入进来过,就调用拦截器的intercept方法。

    这里的拦截器实际上就是在配置阶段注入进RestTemplate实例的负载均衡拦截器LoadBalancerInterceptor实例,可参考上面配置阶段的第2步。

    class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
    
        //
    
        @Override
        protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
            InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
            return requestExecution.execute(this, bufferedOutput);
        }
    
    
        private class InterceptingRequestExecution implements ClientHttpRequestExecution {
    
            private final Iterator<ClientHttpRequestInterceptor> iterator;
    
            public InterceptingRequestExecution() {
                this.iterator = interceptors.iterator();
            }
    
            @Override
            public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
                if (this.iterator.hasNext()) {
                    ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                    return nextInterceptor.intercept(request, body, this);
                }
                else {
                    ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
                    for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) {
                        List<String> values = entry.getValue();
                        for (String value : values) {
                            delegate.getHeaders().add(entry.getKey(), value);
                        }
                    }
                    if (body.length > 0) {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                    return delegate.execute();
                }
            }
        }
    
    }

    5.负载均衡拦截器调用负载均衡客户端

    在负载均衡拦截器LoadBalancerInterceptor类的intercept方法中,又调用了负载均衡客户端LoadBalancerClient实现类的execute方法。

    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    
        private LoadBalancerClient loadBalancer;
        private LoadBalancerRequestFactory requestFactory;
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
            this.loadBalancer = loadBalancer;
            this.requestFactory = requestFactory;
        }
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
            // for backwards compatibility
            this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
        }
    
        @Override
        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                final ClientHttpRequestExecution execution) throws IOException {
            final URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
            return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
        }
    }

    在配置阶段的第1步,可以看到实现类是RibbonLoadBalancerClient。

    6.负载均衡客户端调用负载均衡策略选取目标服务实例并发起请求

    在RibbonLoadBalancerClient的第一个execute方法以及getServer方法中可以看到,实际上是通过ILoadBalancer的负载均衡器实现类作的chooseServer方法选取一个服务,交给接下来的请求对象发起一个请求。

    这里的负载均衡实现类默认是ZoneAwareLoadBalancer区域感知负载均衡器实例,其内部通过均衡策略选择一个服务。

    ZoneAwareLoadBalancer的创建可以参考配置阶段的第4步。

    public class RibbonLoadBalancerClient implements LoadBalancerClient {
        @Override
        public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
            ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
            Server server = getServer(loadBalancer);
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
            RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                    serviceId), serverIntrospector(serviceId).getMetadata(server));
    
            return execute(serviceId, ribbonServer, request);
        }
    
        @Override
        public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
            Server server = null;
            if(serviceInstance instanceof RibbonServer) {
                server = ((RibbonServer)serviceInstance).getServer();
            }
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
    
            RibbonLoadBalancerContext context = this.clientFactory
                    .getLoadBalancerContext(serviceId);
            RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
            try {
                T returnVal = request.apply(serviceInstance);
                statsRecorder.recordStats(returnVal);
                return returnVal;
            }
            // catch IOException and rethrow so RestTemplate behaves correctly
            catch (IOException ex) {
                statsRecorder.recordStats(ex);
                throw ex;
            }
            catch (Exception ex) {
                statsRecorder.recordStats(ex);
                ReflectionUtils.rethrowRuntimeException(ex);
            }
            return null;
        }
           
        //
    
        protected Server getServer(ILoadBalancer loadBalancer) {
            if (loadBalancer == null) {
                return null;
            }
            return loadBalancer.chooseServer("default"); // TODO: better handling of key
        }
    
        protected ILoadBalancer getLoadBalancer(String serviceId) {
            return this.clientFactory.getLoadBalancer(serviceId);
        }
    
        public static class RibbonServer implements ServiceInstance {
            private final String serviceId;
            private final Server server;
            private final boolean secure;
            private Map<String, String> metadata;
    
            public RibbonServer(String serviceId, Server server) {
                this(serviceId, server, false, Collections.<String, String> emptyMap());
            }
    
            public RibbonServer(String serviceId, Server server, boolean secure,
                    Map<String, String> metadata) {
                this.serviceId = serviceId;
                this.server = server;
                this.secure = secure;
                this.metadata = metadata;
            }
    
            //
        }
    
    }

    代码撸完,总结下。

    普通使用RestTemplate请求其他服务时,内部使用的就是常规的http请求实例发送请求。

    为RestTemplate增加了@LoanBalanced 注解后,实际上通过配置,为RestTemplate注入负载均衡拦截器,让负载均衡器选择根据其对应的策略选择合适的服务后,再发送请求。

    End

  • 相关阅读:
    数学建模课程常用英语词汇(专业术语)
    MATLAB简易画图
    MATLAB与C语言对比实例:随机数生成
    C语言 投票系统:给定候选人,从键盘输入候选人的名字,统计票数,并输出最终获胜者
    C语言 一个数学问题:求s=(a^m)!+(b^n)!
    C语言 矩阵的转置及矩阵的乘法
    iOS_数据存取(二)
    iOS_数据存取(一)
    iOS_SDWebImage框架分析
    iOS_AFNetWorking框架分析
  • 原文地址:https://www.cnblogs.com/kongxianghai/p/8445030.html
Copyright © 2011-2022 走看看