负载策略
RoundRobinRule,轮训策略,默认策略
RandomRule,随机,使用Random对象从服务列表中随机选择一个服务
RetryRule,轮询 + 重试
WeightedResponseTimeRule:优先选择响应时间快,此策略会根据平均响应时间计算所有服务的权重,响应时间越快,服务权重越重、被选中的概率越高。此类有个DynamicServerWeightTask的定时任务,默认情况下每隔30秒会计算一次各个服务实例的权重。刚启动时,如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,会切换回来
AvailabilityFilteringRule:可用性过滤,会先过滤掉以下服务:由于多次访问故障而断路器处于打开的服务、并发的连接数量超过阈值,然后对剩余的服务列表按照RoundRobinRule策略进行访问
BestAvailableRule:优先选择并发请求最小的,刚启动时吗,如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,才会切换回来
ZoneAvoidanceRule:可以实现避免可能访问失效的区域(zone)
@LoadBalanced开启了负载功能
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
- 在RestTemplate上面增加注解@LoadBalanced,就开启了负载功能。
- 该注解会把所有的的RestTemplate实例都加上@LoadBalanced,后续通过@Qualifier和@Autowire注解可以将所有的RestTemplate都注入获取到。
然后挨个给这些RestTemplate添加拦截器,在拦截器中实现负载逻辑。
RibbonAutoConfiguration自动配置类
@Configuration
@ConditionalOnClass({IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})
@RibbonClients
@AutoConfigureAfter(
name = {"org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration"}
)
//引入LoadBalancerAutoConfiguration类
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
@EnableConfigurationProperties({RibbonEagerLoadProperties.class})
public class RibbonAutoConfiguration {
@Autowired(
required = false
)
private List<RibbonClientSpecification> configurations = new ArrayList();
//饥饿加载模式配置
//正常模式下ribbon client都是到使用的时候才去加载
//饥饿模式在spring容器初始化完毕就加载ribbon client
@Autowired
private RibbonEagerLoadProperties ribbonEagerLoadProperties;
//子容器工厂,一个项目可以配置多个ribbon client,这些client分别加载自己的容器,互相独立
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
@Bean//引入RibbonLoadBalancerClient
@ConditionalOnMissingBean({LoadBalancerClient.class})
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(this.springClientFactory());
}
@Bean
@ConditionalOnProperty(
value = {"ribbon.eager-load.enabled"},//该属性开启饥饿模式
matchIfMissing = false
)
//饥饿加载模式
public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
return new RibbonApplicationContextInitializer(this.springClientFactory(), this.ribbonEagerLoadProperties.getClients());
}
}
- LoadBalancerAutoConfiguration:给所有RestTemplate添加拦截器
- SpringClientFactory子容器
SpringClientFactory子容器
在多ribbon客户端的情况下,SpringClientFactory会为每个客户端都加载自己的上下文,实现ribbon客户端的隔离性。
举个例子:
定义两个自定义ribbon客户端
@Configurable
public class RibbonCust1 {
@Bean
public IRule myRule1(){
return new RandomRule();
}
}
@Configurable
public class RibbonCust2 {
@Bean
public IRule myRule2(){
return new RetryRule();
}
}
启动类开启ribbon负载,并且指明哪些服务用哪些ribbon
@SpringBootApplication
@RibbonClient(
name = "demo-goods", configuration = MyRule2.class
)
public class Ads2Application {
public static void main(String[] args) {
SpringApplication.run(Ads2Application.class,args);
}
}
这种情况不同的客户端的上下文环境是独立的,SpringClientFactory就是存储子上下文的。
LoadBalancerAutoConfiguration:给所有RestTemplate添加拦截器
@Configuration
@ConditionalOnClass({RestTemplate.class})
@ConditionalOnBean({LoadBalancerClient.class})
@EnableConfigurationProperties({LoadBalancerRetryProperties.class})
public class LoadBalancerAutoConfiguration {
//这里配合@LoadBalanced注解的@Qualifier注解,将所有包含@LoadBalanced注解的RestTemplate给注入进来
@LoadBalanced
@Autowired(
required = false
)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Autowired(
required = false
)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
public LoadBalancerAutoConfiguration() {
}
/***
* 此方法的逻辑
* 获取到所有的RestTemplate,把这些RestTemplate加入到自定义定制器中
* @param customizers
* @return
*/
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializer(final List<RestTemplateCustomizer> customizers) {
return new SmartInitializingSingleton() {
public void afterSingletonsInstantiated() {
Iterator var1 = org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.this.restTemplates.iterator();
while(var1.hasNext()) {
RestTemplate restTemplate = (RestTemplate)var1.next();
Iterator var3 = customizers.iterator();
while(var3.hasNext()) {
RestTemplateCustomizer customizer = (RestTemplateCustomizer)var3.next();
customizer.customize(restTemplate);
}
}
}
};
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Configuration
@ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
static class LoadBalancerInterceptorConfig {
LoadBalancerInterceptorConfig() {
}
//初始化拦截器
@Bean
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
//初始化自定义定制器:该定制器会将放入到restTemplate都加上拦截器
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return new RestTemplateCustomizer() {
public void customize(RestTemplate restTemplate) {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
}
};
}
}
}
//LoadBalancerInterceptor
private LoadBalancerClient loadBalancer;
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
//入口
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
//RibbonLoadBalancerClient
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
//获取负载均衡器
ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
//通过负载均衡器选主一个server
Server server = this.getServer(loadBalancer);//入口1
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
} else {
RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
return this.execute(serviceId, ribbonServer, request);//入口2
}
}
protected Server getServer(ILoadBalancer loadBalancer) {
return loadBalancer == null ? null : loadBalancer.chooseServer("default");
}
- 获取所有restTempalte,遍历挨个添加拦截器。
- 后续通过restTempalte发起远程调用的时候,都会被拦截器拦截
- 通过loadBalancer选择一个server
- 发起远程调用
loadBalancer
先看下loadBalancer什么样
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
private final static IRule DEFAULT_RULE = new RoundRobinRule();
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IRule rule = DEFAULT_RULE;
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
private IClientConfig config;
}
public BaseLoadBalancer() {
this.name = DEFAULT_NAME;
this.ping = null;
setRule(DEFAULT_RULE);
setupPingTask();//入口
lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);//入口
forceQuickPing();
}
// protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
//默认Ping 的方式
private static class SerialPingStrategy implements IPingStrategy {
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}
class Pinger {
private final IPingStrategy pingerStrategy;
public Pinger(IPingStrategy pingerStrategy) {
this.pingerStrategy = pingerStrategy;
}
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
//如果当前检测状态和之前的状态不一致,稍后会用于发送状态变化通知
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
//如果是存活状态,添加进list
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
//发送状态变化通知消息
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
}
- 默认的负载均衡策略是轮询;
- 内部保存了两个list,一个用来保存所有server,一个用来保存存活的server
- 通过一个定时任务用来维护上面两个list,10S 执行一次
- 拿到当前 allServerList 中 所有的节点去Ping,在allServerList 中标记其存活状态,用当前Ping的存活状态 和 上次的 准或状态比对,如果 状态不同 ,发出通知 这些节点状态有改变;如果状态为存活,那么将 upServerList 中的节点用本次检测出的所有存活节点替换。
接下来看下loadBalancer选择server的方法
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);//入口
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
- 这个方法即通过具体的负载策略来选择服务器
分析完 BaseLoadBalancer ,DynamicServerListLoadBalancer 和 ZoneAwareLoadBalancer 基本大同小异:
DynamicServerListLoadBalancer :使用动态源的服务器, 即服务器列表可能是在运行时更改。 通过一些Filter函数来动态的过滤掉指定的服务器列表;
ZoneAwareLoadBalancer :这个负载均衡器适用于异地多机房的情况,在选择服务器的时候可以避免整个区域。