zoukankan      html  css  js  c++  java
  • Ribbon的检查服务

    Ribbon的检查服务

    上面一篇已经看到了Ribbon已经可以和Eureka整合了,而且可以通过EurekaClient拉取服务信息,如果拉取的服务中出问题了怎么办,就会导致在请求的时候发现请求不同,那么下面就看看IPing组件,它是专门检查服务是否有效的,有效的才会被留下来。

    定时调度

    因为Ribbon长期拉取Eureka的注册信息是个长期的过程,所以肯定会有一个专门的线程去做这件事,那我们就看一下DynamicServerListLoadBalancer初始化的那块代码

        public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                             ServerList<T> serverList, ServerListFilter<T> filter,
                                             ServerListUpdater serverListUpdater) {
            super(clientConfig, rule, ping);
            this.serverListImpl = serverList;
            this.filter = filter;
            this.serverListUpdater = serverListUpdater;
            if (filter instanceof AbstractServerListFilter) {
                ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
            }
            restOfInit(clientConfig);
        }
    

    因为此处穿进去了一个Iping,肯定是要做什么事情,那就继续跟踪super(clientConfig, rule, ping);

       public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
            initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
        }
    
        void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
            this.config = clientConfig;
            String clientName = clientConfig.getClientName();
            this.name = clientName;
            int pingIntervalTime = Integer.parseInt(""
                    + clientConfig.getProperty(
                            CommonClientConfigKey.NFLoadBalancerPingInterval,
                            Integer.parseInt("30")));
            int maxTotalPingTime = Integer.parseInt(""
                    + clientConfig.getProperty(
                            CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
                            Integer.parseInt("2")));
    
            setPingInterval(pingIntervalTime);
            setMaxTotalPingTime(maxTotalPingTime);
    
            // cross associate with each other
            // i.e. Rule,Ping meet your container LB
            // LB, these are your Ping and Rule guys ...
            setRule(rule);
            setPing(ping);
    
            setLoadBalancerStats(stats);
            rule.setLoadBalancer(this);
            if (ping instanceof AbstractLoadBalancerPing) {
                ((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
            }
            logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
            boolean enablePrimeConnections = clientConfig.get(
                    CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);
    
            if (enablePrimeConnections) {
                this.setEnablePrimingConnections(true);
                PrimeConnections primeConnections = new PrimeConnections(
                        this.getName(), clientConfig);
                this.setPrimeConnections(primeConnections);
            }
            init();
    
        }
    

    在中间调用了方法 setPingInterval(pingIntervalTime);看名字是设置ping的间隔的。如果ping的间隔小于1,则调用setupPingTask(),看上去就是开启Ping的定时任务的

        public void setPingInterval(int pingIntervalSeconds) {
            if (pingIntervalSeconds < 1) {
                return;
            }
    
            this.pingIntervalSeconds = pingIntervalSeconds;
            if (logger.isDebugEnabled()) {
                logger.debug("LoadBalancer [{}]:  pingIntervalSeconds set to {}",
            	    name, this.pingIntervalSeconds);
            }
            setupPingTask(); // since ping data changed
        }
    
        void setupPingTask() {
            //首先判断是否要跳过
            if (canSkipPing()) {
                return;
            }
            if (lbTimer != null) {
                lbTimer.cancel();
            }
            lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                    true);
            //开启一个PingTask的任务
            lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
            forceQuickPing();
        }
    

    PingTask这是一个内部类,上面的英文已经写的很清楚了,TimerTask that keeps runs every X seconds to check the status of each server/node in the Server List

    这个定时任务是为了间隔x秒去检测正在跑的服务实例清单的状态。那我们就点进去看看

        class PingTask extends TimerTask {
            public void run() {
                try {
                	new Pinger(pingStrategy).runPinger();
                } catch (Exception e) {
                    logger.error("LoadBalancer [{}]: Error pinging", name, e);
                }
            }
        }
    

    主要是调用了runPinger这个方法。

    public void runPinger() throws Exception {
                if (!pingInProgress.compareAndSet(false, true)) { 
                    return; // Ping in progress - nothing to do
                }
                
                // we are "in" - we get to Ping
    
                Server[] allServers = null;
                boolean[] results = null;
    
                Lock allLock = null;
                Lock upLock = null;
    
                try {
                    /*
                     * The readLock should be free unless an addServer operation is
                     * going on...
                     */
                    allLock = allServerLock.readLock();
                    allLock.lock();
                    allServers = allServerList.toArray(new Server[allServerList.size()]);
                    allLock.unlock();
    
                    int numCandidates = allServers.length;
                    results = pingerStrategy.pingServers(ping, allServers);
    
                    final List<Server> newUpList = new ArrayList<Server>();
                    final List<Server> changedServers = new ArrayList<Server>();
    
                    for (int i = 0; i < numCandidates; i++) {
                        boolean isAlive = results[i];
                        Server svr = allServers[i];
                        boolean oldIsAlive = svr.isAlive();
    
                        svr.setAlive(isAlive);
    
                        if (oldIsAlive != isAlive) {
                            changedServers.add(svr);
                            logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                        		name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                        }
    
                        if (isAlive) {
                            newUpList.add(svr);
                        }
                    }
                    upLock = upServerLock.writeLock();
                    upLock.lock();
                    upServerList = newUpList;
                    upLock.unlock();
    
                    notifyServerStatusChangeListener(changedServers);
                } finally {
                    pingInProgress.set(false);
                }
            }
        }
    

    results = pingerStrategy.pingServers(ping, allServers);这里用到了ping,那就点进去看看,看这个方法名字,也是ping服务。

        private static class SerialPingStrategy implements IPingStrategy {
    
            @Override
            public boolean[] pingServers(IPing ping, Server[] servers) {
                int numCandidates = servers.length;
                boolean[] results = new boolean[numCandidates];
    
                logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);
    
                for (int i = 0; i < numCandidates; i++) {
                    results[i] = false; /* Default answer is DEAD. */
                    try {
                        if (ping != null) {
                            results[i] = ping.isAlive(servers[i]);
                        }
                    } catch (Exception e) {
                        logger.error("Exception while pinging Server: '{}'", servers[i], e);
                    }
                }
                return results;
    

    到最后去调用的还是ping本身的isAlive方法。最终来到的是它的子类的方法。NIWSDiscoveryPing#isAlive,看这个类的说明,才是真正的ping客户端的,逻辑里面只是看看这个服务的状态是不是UP,是的话就返回true。

    /**
     * "Ping" Discovery Client
     * i.e. we dont do a real "ping". We just assume that the server is up if Discovery Client says so
     * @author stonse
     *
     */
    
      public boolean isAlive(Server server) {
    		    boolean isAlive = true;
    		    if (server!=null && server instanceof DiscoveryEnabledServer){
    	            DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;	            
    	            InstanceInfo instanceInfo = dServer.getInstanceInfo();
    	            if (instanceInfo!=null){	                
    	                InstanceStatus status = instanceInfo.getStatus();
    	                if (status!=null){
    	                    isAlive = status.equals(InstanceStatus.UP);
    	                }
    	            }
    	        }
    		    return isAlive;
    		}
    

    在一开始的方法中,下面几行有一行setPing(ping);,它也会进行校验,发现如果不是本身的ping,会取消定时调度,在重新生成一个。

    image-20211019101609751

    直接调用

    看了eureka的源码和ribbon源码,发现一般都是手动调用一次和线程定时任务都是同时存在的。当我们来到更新所有的服务清单的时候,有一行强制快速的ping一下。

    image-20211018175405835

    发现最后也是调用的这个方法。

    image-20211018175422224
  • 相关阅读:
    奖券数目
    用jQuery和ajax实现搜索框文字自动补全功能
    简单的文件上传功能实现(java)
    示例演示公告通知标题无缝向上滚动,文字段落无缝向上滚动,简单的wangeditor富文本编辑器,简单的音乐播放demo
    SSM框架中注解含义及应用场景小结
    数据库优化以及SQL优化小结
    javaWEB中前后台乱码解决问题小结
    线程同步的方法
    MySQL的简单使用
    springMVC运行流程图及运行原理
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15423705.html
Copyright © 2011-2022 走看看