zoukankan      html  css  js  c++  java
  • Spring Cloud-Ribbon ILoadBalancer负载均衡器核心源码(四)

    Ribbon负载均衡相关类

    AbstractloadBalancer

    ILoadBalancer的抽象实现类

    public abstract class AbstractLoadBalancer implements ILoadBalancer {
    
        //服务实例分组枚举
        //• ALL: 所有服务实例。
        //• STATUS_UP: 正常服务的实例。
        //• STATUS_NOT_UP: 停止服务的实例。
        public enum ServerGroup{
            ALL,
            STATUS_UP,
            STATUS_NOT_UP
        }
        //再根据负载均衡器选择服务实例时忽略key
        public Server chooseServer() {
            return chooseServer(null);
        }
        //定义了根据分组类型来获取 不同的服务实例的列表。
        public abstract List<Server> getServerList(ServerGroup serverGroup);
        //对象被用来存储负载均衡器中各个 服务实例当前的属性和 统计信息。 这些信息非常有用, 我们可以利用这些信息来观察负载均衡器的运行情
        //况, 同时这些信息也是用来制定负载均衡策略的重要依据。
        public abstract LoadBalancerStats getLoadBalancerStats();
    }

    BaseloadBalancer

    BaseLoadBalancer 类是和ribbon 负载均衡器的基础实现类,在该类中定义了很多关 于负载均衡器相关的基础内容。

    public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnectionListener, IClientConfigAware {
        private static Logger logger = LoggerFactory.getLogger(BaseLoadBalancer.class);
        private static final IRule DEFAULT_RULE = new RoundRobinRule();
        private static final BaseLoadBalancer.SerialPingStrategy DEFAULT_PING_STRATEGY = new BaseLoadBalancer.SerialPingStrategy();
        private static final String DEFAULT_NAME = "default";
        private static final String PREFIX = "LoadBalancer_";
        //BaseLoadBalancer服务选择是委托给IRule 这个接口表示负载均衡选择策略
        protected IRule rule;
        //使用IPing检查服务是否有效的执行对象 内部使用线性轮训  默认实现类内部类 SerialPingStrategy
        protected IPingStrategy pingStrategy;
        //用于检查服务是否有效  默认为空需要注入
        protected IPing ping;
        //维护所有服务
        @Monitor(name = "LoadBalancer_AllServerList", type = DataSourceType.INFORMATIONAL)
        protected volatile List<Server> allServerList;
        //维护有效服务
        @Monitor(name = "LoadBalancer_UpServerList", type = DataSourceType.INFORMATIONAL)
        protected volatile List<Server> upServerList;
        ......
    
        public BaseLoadBalancer() {
            this.rule = DEFAULT_RULE;//默认使用new RoundRobinRule()
            this.pingStrategy = DEFAULT_PING_STRATEGY;
            this.ping = null;
            this.allServerList = Collections.synchronizedList(new ArrayList());
            this.upServerList = Collections.synchronizedList(new ArrayList());
            .....
            this.ping = null;
            this.setRule(DEFAULT_RULE);
            this.setupPingTask();
        }
    
        //将服务添加到服务清单
        public void addServer(Server newServer) {
            if (newServer != null) {
                try {
                    ArrayList<Server> newList = new ArrayList();
                    newList.addAll(this.allServerList);
                    newList.add(newServer);
                    this.setServersList(newList);
                } catch (Exception var3) {
                    logger.error("LoadBalancer [{}]: Error adding newServer {}", new Object[]{this.name, newServer.getHost(), var3});
                }
            }
    
        }
    
        //将服务添加到服务清单
        public void addServers(List<Server> newServers) {
            if (newServers != null && newServers.size() > 0) {
                try {
                    ArrayList<Server> newList = new ArrayList();
                    newList.addAll(this.allServerList);
                    newList.addAll(newServers);
                    this.setServersList(newList);
                } catch (Exception var3) {
                    logger.error("LoadBalancer [{}]: Exception while adding Servers", this.name, var3);
                }
            }
    
        }
    
        //将服务添加到服务清单
        void addServers(Object[] newServers) {
            if (newServers != null && newServers.length > 0) {
                try {
                    ArrayList<Server> newList = new ArrayList();
                    newList.addAll(this.allServerList);
                    Object[] var3 = newServers;
                    int var4 = newServers.length;
    
                    for (int var5 = 0; var5 < var4; ++var5) {
                        Object server = var3[var5];
                        if (server != null) {
                            if (server instanceof String) {
                                server = new Server((String) server);
                            }
    
                            if (server instanceof Server) {
                                newList.add((Server) server);
                            }
                        }
                    }
    
                    this.setServersList(newList);
                } catch (Exception var7) {
                    logger.error("LoadBalancer [{}]: Exception while adding Servers", this.name, var7);
                }
            }
    
        }
    
        //获得所有服务
        public List<Server> getAllServers() {
            return Collections.unmodifiableList(this.allServerList);
        }
    
        //根据组获得服务
        public List<Server> getServerList(ServerGroup serverGroup) {
            switch (serverGroup) {
                case ALL:
                    return this.allServerList;
                case STATUS_UP:
                    return this.upServerList;
                case STATUS_NOT_UP:
                    ArrayList<Server> notAvailableServers = new ArrayList(this.allServerList);
                    ArrayList<Server> upServers = new ArrayList(this.upServerList);
                    notAvailableServers.removeAll(upServers);
                    return notAvailableServers;
                default:
                    return new ArrayList();
            }
        }
    
    
        //根据实例id获得服务
        public Server chooseServer(Object key) {
            if (this.counter == null) {
                this.counter = this.createCounter();
            }
            this.counter.increment();
            if (this.rule == null) {
                return null;
            } else {
                try {
                    return this.rule.choose(key);
                } catch (Exception var3) {
                    logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", new Object[]{this.name, key, var3});
                    return null;
                }
            }
        }
    
        //标记服务无效
        public void markServerDown(Server server) {
            if (server != null && server.isAlive()) {
                logger.error("LoadBalancer [{}]:  markServerDown called on [{}]", this.name, server.getId());
                server.setAlive(false);
                this.notifyServerStatusChangeListener(Collections.singleton(server));
            }
        }
    }
    
    
    
        
    }

    DynamicServerlistloadBalancer

    继承BaseloadBalancer 是对BaseloadBalancer扩展 实现了再服务运行期间动态更新的能力 还增加通过过滤器 选择性的过滤一些服务的功能

    public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
        private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);
    
        boolean isSecure = false;
        boolean useTunnel = false;
    
        // to keep track of modification of server lists
        protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
    
        //用于更新服务清单 由EurekaRibbonClientConfiguration.ribbonServerList 创建DiscoveryEnabledNIWSServerList
        volatile ServerList<T> serverListImpl;
        //从注册中心获取到服务后更新本地服务
        protected volatile ServerListUpdater serverListUpdtater;
        //通过serverListImpl.getUpdatedListOfServers执行更新操作
        protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
            @Override
            public void doUpdate() {
                updateListOfServers();
            }
        };
        public void updateListOfServers() {
            List<T> servers = new ArrayList<T>();
            if (serverListImpl != null) {
                servers = serverListImpl.getUpdatedListOfServers();
                LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
    
                if (filter != null) {
                    servers = filter.getFilteredListOfServers(servers);
                    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                            getIdentifier(), servers);
                }
            }
            updateAllServerList(servers);
        }
    }

    ServerList<T>

    public interface ServerList<T extends Server> {
    
        //于获取初始化的服务实例 清单,
        public List<T> getInitialListOfServers();
        //用于获取更新的服务实例清单 
        public List<T> getUpdatedListOfServers();
    
    }

    DiscoveryEnabledNIWSServerList

    public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer> {
    
        public List<DiscoveryEnabledServer> getInitialListOfServers() {
            return this.obtainServersViaDiscovery();
        }
    
        public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
            return this.obtainServersViaDiscovery();
        }
    
        private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
            List<DiscoveryEnabledServer> serverList = new ArrayList();
            if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {
                EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();
                if (this.vipAddresses != null) {
                    //可以理解为逻辑服务名字 比如PROVIDER
                    String[] var3 = this.vipAddresses.split(",");
                    int var4 = var3.length;
    
                    for(int var5 = 0; var5 < var4; ++var5) {
                        String vipAddress = var3[var5];
                        //通过EurekaClient从注册中心拉取服务
                        List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);
                        Iterator var8 = listOfInstanceInfo.iterator();
    
                        while(var8.hasNext()) {
                            InstanceInfo ii = (InstanceInfo)var8.next();
                            if (ii.getStatus().equals(InstanceStatus.UP)) {
                                if (this.shouldUseOverridePort) {
                                    if (logger.isDebugEnabled()) {
                                        logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);
                                    }
    
                                    InstanceInfo copy = new InstanceInfo(ii);
                                    if (this.isSecure) {
                                        ii = (new Builder(copy)).setSecurePort(this.overridePort).build();
                                    } else {
                                        ii = (new Builder(copy)).setPort(this.overridePort).build();
                                    }
                                }
    
                                DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr);
                                serverList.add(des);
                            }
                        }
    
                        if (serverList.size() > 0 && this.prioritizeVipAddressBasedServers) {
                            break;
                        }
                    }
                }
                //最终返回拉取的服务
                return serverList;
            } else {
                logger.warn("EurekaClient has not been initialized yet, returning an empty list");
                return new ArrayList();
            }
        }
    
    
    }

    ServerListUpdater

    public interface ServerListUpdater {
        public interface UpdateAction {
            void doUpdate();
        }
        //启动服务更新器, 传入的UpdadataAction对象为更新操作的具体实现。
        void start(UpdateAction updateAction);
    
        //停止服务更新器
        void stop();
    
        ////荻取最近的更新时间戳
        String getLastUpdate();
    
        //获取上一次更新到现在的时间间隔,单位为毫秒
        long getDurationSinceLastUpdateMs();
    
        ////荻取错过的更新周期数
        int getNumberMissedCycles();
    
        ////荻取核心线程数
        int getCoreThreads();
    }

    PollingServerListUpdater动态更新服务的默认策略 采用定时任务

    EurekaNotificationServerListUpdater 也是动态更新服务但是它利用 Eureka 的事件监听器来驱动服务列表的更新操作。

    PollingServerListUpdater

    public class PollingServerListUpdater implements ServerListUpdater {
        //是DynamicServerListlodBalancer的成员变量传入
        public synchronized void start(final UpdateAction updateAction) {
            if (this.isActive.compareAndSet(false, true)) {
                Runnable wrapperRunnable = new Runnable() {
                    public void run() {
                        if (!PollingServerListUpdater.this.isActive.get()) {
                            if (PollingServerListUpdater.this.scheduledFuture != null) {
                                PollingServerListUpdater.this.scheduledFuture.cancel(true);
                            }
    
                        } else {
                            try {
                                //外部传入拉取服务 调用的DynamicServerListLoadBalancer.updateAction.doUpdate()   这个doupdate里面调用serverListImpl.getUpdatedListOfServers();拉取服务
                                updateAction.doUpdate();
                                PollingServerListUpdater.this.lastUpdated = System.currentTimeMillis();
                            } catch (Exception var2) {
                                PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
                            }
    
                        }
                    }
                };
                //开启定时任务
                this.scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS);
            } else {
                logger.info("Already active, no-op");
            }
    
        }
    
    }

    ServerListFilter

    DynamicServerlistloadBalancer 最终拉取服务的方法 涉及到一个filter

    @VisibleForTesting
        public void updateListOfServers() {
            List<T> servers = new ArrayList<T>();
            if (serverListImpl != null) {
                servers = serverListImpl.getUpdatedListOfServers();
                LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
    
                if (filter != null) {
                    //对应成员变量volatile ServerListFilter<T> filter;
                    servers = filter.getFilteredListOfServers(servers);
                    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                            getIdentifier(), servers);
                }
            }
            updateAllServerList(servers);
        }
    public interface ServerListFilter<T extends Server> {
        //主要用于实现对服务实例列表的过滤, 通过传入的 服务实例清单, 根据 一 些规则返回过滤后的服务实例清单
        public List<T> getFilteredListOfServers(List<T> servers);
    }

    AbstractServerListFilter

      这是一 个抽象过滤器,在这里定义了过滤时需要 的一个重要依据对象 LoadBalancerStats, LoadBalancerStats保存了负载均衡器的属性和统计信息

    public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {
        //保存了负载均衡器的属性和统计信息
        private volatile LoadBalancerStats stats;
    
        public AbstractServerListFilter() {
        }
    
        public void setLoadBalancerStats(LoadBalancerStats stats) {
            this.stats = stats;
        }
    
        public LoadBalancerStats getLoadBalancerStats() {
            return this.stats;
        }
    }

    ZoneAffinityServerListFilter

    该过滤器基于 “ 区域感知 (Zone Affinity)" 的方式实现服务实例的过滤, 也就是说, 它会根据提供服务的实例所处的区域 (Zone) 与消费者自身的所处区域 (Zone) 进行比较, 过滤掉那些不是同处 一 个区

    域的实例。

    public class ZoneAffinityServerListFilter<T extends Server> extends AbstractServerListFilter<T> implements IClientConfigAware {
        private volatile boolean zoneAffinity;
        private volatile boolean zoneExclusive;
    
        public List<T> getFilteredListOfServers(List<T> servers) {
            if (this.zone != null && (this.zoneAffinity || this.zoneExclusive) && servers != null && servers.size() > 0) {
                //Iterables.filter实现过滤
                List<T> filteredServers = Lists.newArrayList(Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
                //判断是否要启用区域感知功能 调用下面shouldEnableZoneAffinity方法
                if (this.shouldEnableZoneAffinity(filteredServers)) {
                    return filteredServers;
                }
    
                if (this.zoneAffinity) {
                    this.overrideCounter.increment();
                }
            }
    
            return servers;
        }
        private boolean shouldEnableZoneAffinity(List<T> filtered) {
            if (!this.zoneAffinity && !this.zoneExclusive) {
                return false;
            } else if (this.zoneExclusive) {
                return true;
            } else {
                //保存了负载均衡器的相关统计信息
                LoadBalancerStats stats = this.getLoadBalancerStats();
                if (stats == null) {
                    return this.zoneAffinity;
                } else {
                    logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
                    //取这些过滤后的同区域实例的基础指标(包含实例数量、断路器断开数、 活动请求数、 实例平均负载等),
                    ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
                    double loadPerServer = snapshot.getLoadPerServer();
                    int instanceCount = snapshot.getInstanceCount();
                    int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
                    /**
                     * 当以下其中一个满足就不启用区域感知
                     *  blackOutServerPercentage: 故障实例百分比(断路器断开数/实例数量) >=0.8。
                     *  activeReqeustsPerServer: 实例平均负载 >=0.6 。
                     * availableServers: 可用实例数(实例数量 - 断路器断开数) <2。
                     */
                    if ((double)circuitBreakerTrippedCount / (double)instanceCount < this.blackOutServerPercentageThreshold.get() && loadPerServer < this.activeReqeustsPerServerThreshold.get() && instanceCount - circuitBreakerTrippedCount >= this.availableServersThreshold.get()) {
                        return true;
                    } else {
                        logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", new Object[]{(double)circuitBreakerTrippedCount / (double)instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
                        return false;
                    }
                }
            }
        }
    
    
    }

     DefaultNIWSServerListFilter

    完全继承ZoneAffinityServerListFilter 是默认的过滤器

    public class DefaultNIWSServerListFilter<T extends Server> extends ZoneAffinityServerListFilter<T> {
        public DefaultNIWSServerListFilter() {
        }
    }

    ServerListSubsetFilter

    ZonePreferenceServerListFilter

    /**
     * eureka和ribbon整合额默认过滤器
     */
    public class ZonePreferenceServerListFilter extends ZoneAffinityServerListFilter<Server> {
        private String zone;
        public List<Server> getFilteredListOfServers(List<Server> servers) {
            //通过父类的区域感知获得所有服务列表
            List<Server> output = super.getFilteredListOfServers(servers);
            //如果消费者配置了zone
            if (this.zone != null && output.size() == servers.size()) {
                List<Server> local = new ArrayList();
                Iterator var4 = output.iterator();
    
                //遍历服务列表剔除跟消费者配置zone不一致的服务
                while(var4.hasNext()) {
                    
                    Server server = (Server)var4.next();
                    if (this.zone.equalsIgnoreCase(server.getZone())) {
                        local.add(server);
                    }
                }
    
                if (!local.isEmpty()) {
                    return local;
                }
            }
    
            return output;
        }
    }

    ZoneAwareLoadBalancer

    public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
    
    
        //重写了父类的方法
        protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
            //父类会获获得所有服务 并根据zone进行分组 每个zone对应一个Zonestats
            super.setServerListForZones(zoneServersMap);
            //创建一个自己的ConcurrentHashMap 存储对应zone的负载均衡器
            if (this.balancers == null) {
                this.balancers = new ConcurrentHashMap();
            }
             //获得服务的迭代器
            Iterator var2 = zoneServersMap.entrySet().iterator();
    
            Entry existingLBEntry;
            while(var2.hasNext()) {
                existingLBEntry = (Entry)var2.next();
                String zone = ((String)existingLBEntry.getKey()).toLowerCase();
                //内部管理根据balancers 获得对应的负载均衡器(内部会创建对应的IRole规则 没有指定的话 默认Availability­ FilieringRule) 将对应的zone的服务放到对应的负载均衡器
                this.getLoadBalancer(zone).setServersList((List)existingLBEntry.getValue());
            }
            //遍历负载均衡器
            var2 = this.balancers.entrySet().iterator();
            while(var2.hasNext()) {
                existingLBEntry = (Entry)var2.next();
                //检查对应的zone下面是否没有服务了 如果是的话就清空
                if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                    ((BaseLoadBalancer)existingLBEntry.getValue()).setServersList(Collections.emptyList());
                }
            }
    
        }
    
        //重写父类的chooseServer
        public Server chooseServer(Object key) {
            //当负载均衡器中的zone大于1的时候才执行自定义策略 否则还是用父类的
            if (ENABLED.get() && this.getLoadBalancerStats().getAvailableZones().size() > 1) {
                Server server = null;
    
                try {
                    LoadBalancerStats lbStats = this.getLoadBalancerStats();
                    //为当前负载均衡器的所有zone创建快照
                    Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
                    logger.debug("Zone snapshots: {}", zoneSnapshot);
                    if (this.triggeringLoad == null) {
                        this.triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2D);
                    }
    
                    if (this.triggeringBlackoutPercentage == null) {
                        this.triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999D);
                    }
                    /**
                     * 对应zone的可用区的选择
                     * . 首先它会剔除符合 这些规则的 Zone区域: 所属实例数为零的 Zone 区域; Zone 区域内实例的平均负载小千零,或者实例故障率( 断路器断开次数/实例数)大 于 等于阙值(默认为0.99999)。
                     * . 然后根据Zone区域的实例平均负载计算出最差的Zone区域,这里的最差指的是 实例平均负载最高的Zone区域。
                     * . 如果在上面的过程中没有符合剔除要求的区域,同时实例最大平均负载小千闾值 (默认为20%), 就直接返回所有Zone区域为可用区域。 否则,从最坏Zone区
                     * 域集合中随机选择 一 个,将它从可用Zone区域集合中 剔除。
                     */
                    Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, this.triggeringLoad.get(), this.triggeringBlackoutPercentage.get());
                    logger.debug("Available zones: {}", availableZones);
                    //当获取的zone区域不等于空 并且小于总数
                    if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
                        //随机选择一个
                        String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                        logger.debug("Zone chosen: {}", zone);
                        if (zone != null) {
                            BaseLoadBalancer zoneLoadBalancer = this.getLoadBalancer(zone);
                            //选用具体的服务实例  内部使用ZoneAvoidanceRule选择
                            server = zoneLoadBalancer.chooseServer(key);
                        }
                    }
                } catch (Exception var8) {
                    logger.error("Error choosing server using zone aware logic for load balancer={}", this.name, var8);
                }
    
                if (server != null) {
                    return server;
                } else {
                    logger.debug("Zone avoidance logic is not invoked.");
                    return super.chooseServer(key);
                }
            } else {
                logger.debug("Zone aware logic disabled or there is only one zone");
                return super.chooseServer(key);
            }
        }
    
        @VisibleForTesting
        BaseLoadBalancer getLoadBalancer(String zone) {
            zone = zone.toLowerCase();
            BaseLoadBalancer loadBalancer = (BaseLoadBalancer)this.balancers.get(zone);
            if (loadBalancer == null) {
                IRule rule = this.cloneRule(this.getRule());
                loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
                BaseLoadBalancer prev = (BaseLoadBalancer)this.balancers.putIfAbsent(zone, loadBalancer);
                if (prev != null) {
                    loadBalancer = prev;
                }
            }
    
            return loadBalancer;
        }

     

  • 相关阅读:
    javascript 中的 parameter vs arguments
    5款免费开源自建私人云盘程序 实现网盘自建数据自我掌控
    CentOS 7下安装Nextcloud搭建个人网盘
    centos7升级自带的php5.4版本到php5.6
    docker的介绍以及常用命令
    docker-ce的安装以及卸载
    CentOS7安装weblogic集群思路梳理
    centos7修改静态ip地址
    Windows server 2008启动remote dosktop services服务报错1079
    Centos 7.x 安装配置tomcat-8过程梳理
  • 原文地址:https://www.cnblogs.com/LQBlog/p/10084351.html
Copyright © 2011-2022 走看看