Ribbon总体的源码结构,如下图所示:
上图是Ribbon源码的总览图,每一个颜色,代表源码中的一部分内容,总体来看,Ribbon源码的实现,总共分为五个部分,从上而下依次为,Ribbon入口实现、IloadBalancer实现、ServerListUpdater实现、ServerListFilter实现和IRule实现。
针对这些实现,粗略描述如下:
实现内容 | 描述 |
Ribbon入口实现 | 自动配置加载及请求拦截器加载 |
IloadBalancer实现 | 负载均衡器 |
ServerListUpdater实现 | 动态更新本地存储的实例清单 |
ServerListFilter实现 | 过滤器 |
IRule实现 | 负载均衡策略 |
本篇主要说Ribbon入口实现
在SpringCloud--Eureka--搭建中,对于使用Ribbon的项目,主要是在主函数上增加了如下代码
@Bean @LoadBalanced RestTemplate restTemplate(){ return new RestTemplate(); }
然后在后续的调用中,直接使用restTemplate进行调用,那么我们就可以先看一下@LoadBalanced注解,从@LoadBalanced注解源码可以看到,该注解用来给RestTemplate做标记,以使用负载均衡客户端LoadBalancerClient
该客户端接口提供了三个方法:
方法 | 描述 | 源码 |
继承父类的choose方法 | 根据传入的服务名,从负载均衡器中挑选一个对应的服务实例 | ServiceInstance choose(String serviceId); |
execute方法 | 使用从负载均衡器中选择的服务实例进行请求 |
<T> T execute(String serviceId, LoadBalancerRequest<T> request) <T> T execute(String serviceId, ServiceInstance serviceInstance,LoadBalancerRequest<T> request) |
reconstructURI | 为系统构建一个合适的host:port形式的URI,其中请求的ServiceInstance一般是 服务名:port,返回的URI则是ServiceInstance服务实例详情拼接的具体host:port形式的请求地址 | URI reconstructURI(ServiceInstance instance, URI original); |
根据LoadBalancerClient查看,相关类的关系图如下所示:
在上图中,通过类的命名可以看出LoadBalancerAutoConfiguration为客户端负载均衡器的自动化配置类
@Configuration(proxyBeanMethods = false) @ConditionalOnClass(RestTemplate.class) @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration { @LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); @Autowired(required = false) private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList(); @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated( final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) { return () -> restTemplateCustomizers.ifAvailable(customizers -> { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } }); } @Bean @ConditionalOnMissingBean public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) { return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers); } @Configuration(proxyBeanMethods = false) @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static class LoadBalancerInterceptorConfig { @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } } /** * Auto configuration for retry mechanism. */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(RetryTemplate.class) public static class RetryAutoConfiguration { @Bean @ConditionalOnMissingBean public LoadBalancedRetryFactory loadBalancedRetryFactory() { return new LoadBalancedRetryFactory() { }; } } /** * Auto configuration for retry intercepting mechanism. */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(RetryTemplate.class) public static class RetryInterceptorAutoConfiguration { @Bean @ConditionalOnMissingBean public RetryLoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties, LoadBalancerRequestFactory requestFactory, LoadBalancedRetryFactory loadBalancedRetryFactory) { return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, requestFactory, loadBalancedRetryFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final RetryLoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } } }
从源码类的注解上可以知道Ribbon负载均衡必须满足如下两个条件:
1、@ConditionalOnClass(RestTemplate.class):RestTemplate类必须存在该项目中
2、@ConditionalOnBean(LoadBalancerClient.class):必须要有LoadBalancerClient的先险类Bean
同时,该自动化配置类主要做了一下内容:
1、创建了一个LoadBalancerInterceptor的Bean,用以实现对客户端请求的拦截,以实现客户端负载均衡
2、创建了一个RestTemplateCustomizer的Bean,用以给restTemplate增加LoadBalancerInterceptor拦截器
3、维护了一个@LoadBalanced注解修饰的RestTemplate集合,并且进行初始化,通过调用RestTemplateCustomizer的实例来给客户端的RestTemplate添加LoadBalancerInterceptor拦截器
那么接下来看下LoadBalancerInterceptor是如何将RestTemplate进行负载均衡的
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { private LoadBalancerClient loadBalancer; private LoadBalancerRequestFactory requestFactory; public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { this.loadBalancer = loadBalancer; this.requestFactory = requestFactory; } public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { // for backwards compatibility this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); } @Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution)); } }
通过源码可以看到,LoadBalancerInterceptor的构造函数传入了LoadBalancerClient,因此当一个被@LoadBalanced注解修饰的RestTemplate对外发起http请求时,就会被LoadBalancerInterceptor拦截,并调用LoadBalancerClient中的execute方法。
RibbonLoadBalancerClient是LoadBalancerClient的实现类,其中的execute方法如下所示:
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); }
由以上代码可见,先是使用ILoadBalancer获取了一个Server对象,然后将Server对象封装成RibbonServer对象(该对象除了存储服务实例的信息之外,还增加了服务名ServiceId、是否需要使用HTTPS等其他信息),最后使用该对象在回调LoadBalancerInterceptor请求拦截器中LoadBalancerRequest的apply方法,从而向一个具体的实例发起请求,从而实现一开始以服务名为host的URI请求到host:port形式实际访问地址的转换。
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; if (serviceInstance instanceof RibbonServer) { server = ((RibbonServer) serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } // catch IOException and rethrow so RestTemplate behaves correctly catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null; }
由以上代码可见,最终会回调拦截器org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerInterceptor#intercept,在拦截器中,调用了org.springframework.http.client.InterceptingAsyncClientHttpRequest.AsyncRequestExecution#executeAsync方法,该方法传入了一个参数ServiceRequestWrapper
public ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body) throws IOException { if (this.iterator.hasNext()) { AsyncClientHttpRequestInterceptor interceptor = this.iterator.next(); return interceptor.intercept(request, body, this); } else { URI uri = request.getURI(); HttpMethod method = request.getMethod(); HttpHeaders headers = request.getHeaders(); Assert.state(method != null, "No standard HTTP method"); AsyncClientHttpRequest delegate = requestFactory.createAsyncRequest(uri, method); delegate.getHeaders().putAll(headers); if (body.length > 0) { StreamUtils.copy(body, delegate.getBody()); } return delegate.executeAsync(); } } }
可以看到,该方法中,首先获取要请求的URI,因为入参的HttpRequest为ServiceRequestWrapper,因此使用的是ServiceRequestWrapper中的个月URI,然后再调用requestFactory.createAsyncRequest(uri, method)来创建一个http请求。
@Override public URI getURI() { URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI()); return uri; }
顺着该方法向下看,最终访问到org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#reconstructURI方法
@Override public URI reconstructURI(ServiceInstance instance, URI original) { Assert.notNull(instance, "instance can not be null"); String serviceId = instance.getServiceId(); RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); URI uri; Server server; if (instance instanceof RibbonServer) { RibbonServer ribbonServer = (RibbonServer) instance; server = ribbonServer.getServer(); uri = updateToSecureConnectionIfNeeded(original, ribbonServer); } else { server = new Server(instance.getScheme(), instance.getHost(), instance.getPort()); IClientConfig clientConfig = clientFactory.getClientConfig(serviceId); ServerIntrospector serverIntrospector = serverIntrospector(serviceId); uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server); } return context.reconstructURIWithServer(server, uri); }
从上述代码可以看到,首先从ServiceInstance对象中获取了serviceId,然后根据serviceId从SpringClientFactory对象中获取RibbonLoadBalancerContext对象,最后,调用RibbonLoadBalancerContext对象的reconstructURIWithServer方法,获取具体的URI地址。
在上述代码中,SpringClientFactory类是用来创建客户端负载均衡的工厂类,该工厂类会为每一个不同名的Ribbon客户端生成不同的Spring上下文。
RibbonLoadBalancerContext类是LoadBalancerContext类的子类,该类用于存储一些被负载均衡器使用的上下文内容和API操作。
public URI reconstructURIWithServer(Server server, URI original) { String host = server.getHost(); int port = server.getPort(); String scheme = server.getScheme(); if (host.equals(original.getHost()) && port == original.getPort() && scheme == original.getScheme()) { return original; } if (scheme == null) { scheme = original.getScheme(); } if (scheme == null) { scheme = deriveSchemeAndPortFromPartialUri(original).first(); } try { StringBuilder sb = new StringBuilder(); sb.append(scheme).append("://"); if (!Strings.isNullOrEmpty(original.getRawUserInfo())) { sb.append(original.getRawUserInfo()).append("@"); } sb.append(host); if (port >= 0) { sb.append(":").append(port); } sb.append(original.getRawPath()); if (!Strings.isNullOrEmpty(original.getRawQuery())) { sb.append("?").append(original.getRawQuery()); } if (!Strings.isNullOrEmpty(original.getRawFragment())) { sb.append("#").append(original.getRawFragment()); } URI newURI = new URI(sb.toString()); return newURI; } catch (URISyntaxException e) { throw new RuntimeException(e); } }
通过上述代码可以看到,最终是使用host和port最终拼接成了最终的请求地址。