zoukankan      html  css  js  c++  java
  • 一 Balancer regionCountCost 权重。

    原文如下地址 :

    http://www.cnblogs.com/cenyuhai/p/3650943.html

    看源码很久了,终于开始动手写博客了,为什么是先写负载均衡呢,因为一个室友入职新公司了,然后他们遇到这方面的问题,某些机器的硬盘使用明显比别的机器要多,每次用hadoop做完负载均衡,很快又变回来了。

    首先我们先看HMaster当中怎么初始化Balancer的,把集群的状态穿进去,设置master,然后执行初始化。

    //initialize load balancerthis.balancer.setClusterStatus(getClusterStatus());
    this.balancer.setMasterServices(this);
    this.balancer.initialize();

    然后调用是在HMaster的balance()方法当中调用

    复制代码
    Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
      this.assignmentManager.getRegionStates().getAssignmentsByTable();
    
    List<RegionPlan> plans = new ArrayList<RegionPlan>();
    //Give the balancer the current cluster state.this.balancer.setClusterStatus(getClusterStatus());
    //针对表来做平衡,返回平衡方案,针对全局,可能不是最优解for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
        List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
        if (partialPlans != null) plans.addAll(partialPlans);
    }
    复制代码

    可以看到它首先获取了当前的集群的分配情况,这个分配情况是根据表的 Map<TableName, Map<ServerName, List<HRegionInfo>>,然后遍历这个map的values,调用 balancer.balanceCluster(assignments) 来生成一个partialPlans,生成RegionPlan(Region的移动计划) 。

    我们就可以切换到StochasticLoadBalancer当中了,这个是默认Balancer具体的实现了,也是最好的实现,下面就说说这玩意儿咋实现的。

    看一下注释,这个玩意儿吹得神乎其神的,它说它考虑到了这么多因素:

    复制代码

    * <ul> 
    * <li>Region Load</li> Region的负载 
    * <li>Table Load</li>  表的负载 
    * <li>Data Locality</li> 数据本地性 
    * <li>Memstore Sizes</li> 内存Memstore的大小 
    * <li>Storefile Sizes</li> 硬盘存储文件的大小 
    * </ul>

    复制代码

    好,我们从balanceCluster开始看吧,一进来第一件事就是判断是否需要平衡

    //不需要平衡就退出if (!needsBalance(new ClusterLoadState(clusterState))) {
       return null;
    }

    平衡的条件是:负载最大值和最小值要在平均值(region数/server数)的+-slop值之间, 但是这个平均值是基于表的,因为我们传进去的参数clusterState就是基于表的。

    复制代码
    // Check if we even need to do any load balancing// HBASE-3681 check sloppiness firstfloat average = cs.getLoadAverage(); // for logging//集群的负载最大值和最小值要在平均值的+-slop值之间int floor = (int) Math.floor(average * (1 - slop));
    int ceiling = (int) Math.ceil(average * (1 + slop));
    if (!(cs.getMinLoad() > ceiling || cs.getMaxLoad() < floor)) {
        .....return false;
    }
    return true;
    复制代码

      如果需要平衡的话,就开始计算开销了

    复制代码
    // Keep track of servers to iterate through them.
    Cluster cluster = new Cluster(clusterState, loads, regionFinder);
    //计算出来当前的开销    double currentCost = computeCost(cluster, Double.MAX_VALUE);
    double initCost = currentCost;
    double newCost = currentCost;

    for (step = 0; step < computedMaxSteps; step++) { 
    //随机挑选一个"选号器" 
    int pickerIdx = RANDOM.nextInt(pickers.length); 
    RegionPicker p = pickers[pickerIdx]; 
    //用选号器从集群当中随机跳出一对来,待处理的<server,region>对 
    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);

    int leftServer = picks.getFirst().getFirst(); 
    int leftRegion = picks.getFirst().getSecond(); 
    int rightServer = picks.getSecond().getFirst(); 
    int rightRegion = picks.getSecond().getSecond();

    cluster.moveOrSwapRegion(leftServer, 
    rightServer, 
    leftRegion, 
    rightRegion); 
    //移动或者交换完之后,看看新的开销是否要继续 
    newCost = computeCost(cluster, currentCost); 
    // Should this be kept? 挺好,保存新状态 
    if (newCost < currentCost) { 
    currentCost = newCost; 
    } else { 
    // 操作不划算,就回退 
    cluster.moveOrSwapRegion(leftServer, 
    rightServer, 
    rightRegion, 
    leftRegion); 
    }

    if (initCost > currentCost) { 
    //找到了满意的平衡方案 
    List<RegionPlan> plans = createRegionPlans(cluster); 
    return plans; 

    复制代码

      上面的被我清除了细枝末节之后的代码主体,okay,上面逻辑过程如下:

    1. 生成一个虚拟的集群cluster,方便计算计算当前状态的开销,其中clusterState是表的状态,loads是整个集群的状态。

    // Keep track of servers to iterate through them.
    Cluster cluster = new Cluster(clusterState, loads, regionFinder);
    //计算出来当前的开销    double currentCost = computeCost(cluster, Double.MAX_VALUE);
    double initCost = currentCost;
    double newCost = currentCost;

     2. 然后循环 computedMaxSteps次,随机从选出一个picker来计算平衡方案

    int pickerIdx = RANDOM.nextInt(pickers.length);
    RegionPicker p = pickers[pickerIdx];
    //用选号器从集群当中随机跳出一对来,待处理的<server,region>
    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);

      picker是啥?这里面有三个,第一个是RandomRegionPicker是随机挑选region,这里就不详细介绍了,主要讨论后面两个;第二个LoadPicker是计算负载的,第三个主要是考虑本地性的。

    给我感觉就很像ZF的摇号器一样,用哪种算法还要摇个号

    pickers = new RegionPicker[] {
          new RandomRegionPicker(),
          new LoadPicker(),
          localityPicker
    };

      下面我们先看localityPicker的pick方法,这个方法是随机抽选出来一个server、region,找出region的其他本地机器,然后他们返回。

    复制代码
      @Override
      Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
        if (this.masterServices == null) {
        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
          new Pair<Integer, Integer>(-1,-1),
          new Pair<Integer, Integer>(-1,-1)
        );
        }
        // Pick a random region server 随机选出一个server来    int thisServer = pickRandomServer(cluster);
    
        // Pick a random region on this server 随机选出region    int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
    
        if (thisRegion == -1) {
        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
          new Pair<Integer, Integer>(-1,-1),
          new Pair<Integer, Integer>(-1,-1)
        );
        }
    
        // Pick the server with the highest locality 找出本地性最高的目标server    int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion);
    
        // pick an region on the other server to potentially swap    int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
    
        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
          new Pair<Integer, Integer>(thisServer,thisRegion),
          new Pair<Integer, Integer>(otherServer,otherRegion)
        );
      }
    复制代码

      okay,这个结束了,下面我们看看LoadPicker吧。

    复制代码
      @Override
        Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
          cluster.sortServersByRegionCount();
          //先挑选出负载最高的serverint thisServer = pickMostLoadedServer(cluster, -1);
          //再选出除了负载最高的server之外负载最低的serverint otherServer = pickLeastLoadedServer(cluster, thisServer);
    
          Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
          return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
              new Pair<Integer, Integer>(thisServer, regions.getFirst()),
              new Pair<Integer, Integer>(otherServer, regions.getSecond())
    
          );
        }
    复制代码

    这里的负载高和负载低是按照Server上面的region数来算的,而不是存储文件啥的,选出负载最高和负载最低的时候,又随机抽出region来返回了。

    pick挑选的过程介绍完了,那么很明显,计算才是重头戏了,什么样的region会导致计算出来的分数高低呢?

    3. 重点在计算函数上 computeCost(cluster, Double.MAX_VALUE) 结果这个函数也超级简单,哈哈

    复制代码
    protected double computeCost(Cluster cluster, double previousCost) {
        double total = 0;
        
        for (CostFunction c:costFunctions) {
          if (c.getMultiplier() <= 0) {
            continue;
          }
    
          total += c.getMultiplier() * c.cost(cluster);
    
          if (total > previousCost) {
            return total;
          }
        }
        return total;
      }
    复制代码

    遍历CostFunction,拿cost的加权平均和计算出来。

    那costFunction里面都有啥呢?localityCost又出现了,看来本地性是一个很大的考虑的情况。

    复制代码
    costFunctions = new CostFunction[]{
      new RegionCountSkewCostFunction(conf),
      new MoveCostFunction(conf),
      localityCost,
      new TableSkewCostFunction(conf),
      regionLoadFunctions[0],
      regionLoadFunctions[1],
      regionLoadFunctions[2],
      regionLoadFunctions[3],
    };
    

    regionLoadFunctions = new CostFromRegionLoadFunction[] { 
    new ReadRequestCostFunction(conf), 
    new WriteRequestCostFunction(conf), 
    new MemstoreSizeCostFunction(conf), 
    new StoreFileCostFunction(conf) 
    };

    复制代码

    可以看出来,里面真正看中硬盘内容大小的,只有一个StoreFileCostFunction,cost的计算方式有些区别,但都是一个0-1之间的数字,下面给出里面5个函数都用过的cost的函数。

    复制代码
    //cost函数double max = ((count - 1) * mean) + (total - mean);
    for (double n : stats) {
      double diff = Math.abs(mean - n);
      totalCost += diff;
    }
    
    double scaled =  scale(0, max, totalCost);
    return scaled;
    
    //scale函数protected double scale(double min, double max, double value) {
          if (max == 0 || value == 0) {
      return 0;
          }
    
          return Math.max(0d, Math.min(1d, (value - min) / max));
    }
    复制代码

    经过分析吧,我觉得影响里面最后cost最大的是它的权重,下面给一下,这些function的默认权重。

    RegionCountSkewCostFunction hbase.master.balancer.stochastic.regionCountCost ,默认值500
    
    MoveCostFunction hbase.master.balancer.stochastic.moveCost,默认值是100
    
    localityCost hbase.master.balancer.stochastic.localityCost,默认值是25
    TableSkewCostFunction hbase.master.balancer.stochastic.tableSkewCost,默认值是35
    
    ReadRequestCostFunction hbase.master.balancer.stochastic.readRequestCost,默认值是5
    
    WriteRequestCostFunction hbase.master.balancer.stochastic.writeRequestCost,默认值是5
    
    MemstoreSizeCostFunction hbase.master.balancer.stochastic.memstoreSizeCost,默认值是5
    
    StoreFileCostFunction hbase.master.balancer.stochastic.storefileSizeCost,默认值是5
    Storefile的默认值是5,那么低。。。可以试着提高一下这个参数,使它在计算cost消耗的时候,产生更加正向的意义,效果不好说。

    4. 根据虚拟的集群状态生成RegionPlan,这里就不说了

    List<RegionPlan> plans = createRegionPlans(cluster);

    源码的分析完毕,要想减少存储内容分布不均匀,可以试着考虑增加一个picker,这样又不会缺少对其他条件的考虑,具体可以参考 LoadPicker,复制它的实现再写一个,在pickMostLoadedServer和pickLeastLoadedServer这两个方法里面把考虑的条件改一下,以前的条件是Integer[] servers = cluster.serverIndicesSortedByRegionCount; 通过这个来查找一下负载最高和最低的server,那么现在我们要在Cluster里面增加一个Server ---> StoreFile大小的关系映射集合,但是这里面没有,只有regionLoads,RegionLoad这个类有一个方法 getStorefileSizeMB可以获得StoreFile的大小,我们通过里面的region和server的映射 regionIndexToServerIndex来最后计算出来这个映射关系即可,这个计算映射关系个过程放在Cluster的构造函数里面。


     




    God has given me a gift. Only one. I am the most complete fighter in the world. My whole life, I have trained. I must prove I am worthy of someting. rocky_24
  • 相关阅读:
    gateway 实现接口日志保存
    Spring Boot应用的Controller返回的集合类数据是XML格式的可能原因
    json 转list
    观察者模式
    Quartz定时任务整理
    java通过word模板生成word文档
    基于mysql的单据号生成(前缀+日期+自增id+后缀)
    Rabbitmq详解
    java.sql.SQLException: connection holder is null 问题处理
    为什么要用消息队列或消息队列的优缺点
  • 原文地址:https://www.cnblogs.com/rocky24/p/8063d21a0848bb9c6e7890cab0cae90a.html
Copyright © 2011-2022 走看看