Ribbon的检查服务
上面一篇已经看到了Ribbon已经可以和Eureka整合了,而且可以通过EurekaClient拉取服务信息,如果拉取的服务中出问题了怎么办,就会导致在请求的时候发现请求不同,那么下面就看看IPing组件,它是专门检查服务是否有效的,有效的才会被留下来。
定时调度
因为Ribbon长期拉取Eureka的注册信息是个长期的过程,所以肯定会有一个专门的线程去做这件事,那我们就看一下DynamicServerListLoadBalancer初始化的那块代码
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
因为此处穿进去了一个Iping,肯定是要做什么事情,那就继续跟踪super(clientConfig, rule, ping);
public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
}
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
this.config = clientConfig;
String clientName = clientConfig.getClientName();
this.name = clientName;
int pingIntervalTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerPingInterval,
Integer.parseInt("30")));
int maxTotalPingTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
Integer.parseInt("2")));
setPingInterval(pingIntervalTime);
setMaxTotalPingTime(maxTotalPingTime);
// cross associate with each other
// i.e. Rule,Ping meet your container LB
// LB, these are your Ping and Rule guys ...
setRule(rule);
setPing(ping);
setLoadBalancerStats(stats);
rule.setLoadBalancer(this);
if (ping instanceof AbstractLoadBalancerPing) {
((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
}
logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
boolean enablePrimeConnections = clientConfig.get(
CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);
if (enablePrimeConnections) {
this.setEnablePrimingConnections(true);
PrimeConnections primeConnections = new PrimeConnections(
this.getName(), clientConfig);
this.setPrimeConnections(primeConnections);
}
init();
}
在中间调用了方法 setPingInterval(pingIntervalTime);
看名字是设置ping的间隔的。如果ping的间隔小于1,则调用setupPingTask(),看上去就是开启Ping的定时任务的
public void setPingInterval(int pingIntervalSeconds) {
if (pingIntervalSeconds < 1) {
return;
}
this.pingIntervalSeconds = pingIntervalSeconds;
if (logger.isDebugEnabled()) {
logger.debug("LoadBalancer [{}]: pingIntervalSeconds set to {}",
name, this.pingIntervalSeconds);
}
setupPingTask(); // since ping data changed
}
void setupPingTask() {
//首先判断是否要跳过
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
//开启一个PingTask的任务
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
PingTask这是一个内部类,上面的英文已经写的很清楚了,TimerTask that keeps runs every X seconds to check the status of each server/node in the Server List
这个定时任务是为了间隔x秒去检测正在跑的服务实例清单的状态。那我们就点进去看看
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
主要是调用了runPinger这个方法。
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
}
results = pingerStrategy.pingServers(ping, allServers);
这里用到了ping,那就点进去看看,看这个方法名字,也是ping服务。
private static class SerialPingStrategy implements IPingStrategy {
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
到最后去调用的还是ping本身的isAlive方法。最终来到的是它的子类的方法。NIWSDiscoveryPing#isAlive,看这个类的说明,才是真正的ping客户端的,逻辑里面只是看看这个服务的状态是不是UP,是的话就返回true。
/**
* "Ping" Discovery Client
* i.e. we dont do a real "ping". We just assume that the server is up if Discovery Client says so
* @author stonse
*
*/
public boolean isAlive(Server server) {
boolean isAlive = true;
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
在一开始的方法中,下面几行有一行setPing(ping);
,它也会进行校验,发现如果不是本身的ping,会取消定时调度,在重新生成一个。
直接调用
看了eureka的源码和ribbon源码,发现一般都是手动调用一次和线程定时任务都是同时存在的。当我们来到更新所有的服务清单的时候,有一行强制快速的ping一下。
发现最后也是调用的这个方法。