zoukankan      html  css  js  c++  java
  • HDFS源码分析之UnderReplicatedBlocks(一)

    http://blog.csdn.net/lipeng_bigdata/article/details/51160359

        UnderReplicatedBlocks是HDFS中关于块复制的一个重要数据结构。在HDFS的高性能、高容错性体系中,总有一些原因促使HDFS系统内进行块复制工作,比如基于高性能的负载均衡、基于容错性的数据块副本数恢复等。普遍的,任何工作都会有一个优先级的问题,特别是这里的数据块复制,不可能简单的按照先入先出或者其他简单策略,比方说,基于容错性的数据块副本数恢复,特别是数据块副本仅有一个的数据块副本数恢复,其优先级肯定要比基于高性能的负载均衡高,所以数据块复制要有个优先级的概念,那么,数据块复制的优先级怎么确定,怎么存储?一切答案均在UnderReplicatedBlocks中,本文我们将开始分析UnderReplicatedBlocks。

            UnderReplicatedBlocks专门用于存储待优先级的需要复制的数据块。何谓数据块复制优先级,我们看下UnderReplicatedBlocks类中的几个静态成员变量及其说明就会得到答案,如下:

    [java] view plain copy
     
    1. /** The queue with the highest priority: {@value} */  
    2. // 最高优先级队列的优先级值0  
    3. static final int QUEUE_HIGHEST_PRIORITY = 0;  
    4. /** The queue for blocks that are way below their expected value : {@value} */  
    5. // 第二优先级队列的优先级值1:主要针对低于副本数要求很多的数据块  
    6. static final int QUEUE_VERY_UNDER_REPLICATED = 1;  
    7. /** The queue for "normally" under-replicated blocks: {@value} */  
    8. // 第三优先级队列的优先级值2:主要针对低于副本数要求不是很多,即一般情况的数据块  
    9. static final int QUEUE_UNDER_REPLICATED = 2;  
    10. /** The queue for blocks that have the right number of replicas, 
    11.  * but which the block manager felt were badly distributed: {@value} 
    12.  */  
    13. // 第四优先级队列的优先级值3:主要针对副本数满足要求,但是数据块管理器BlockManager感觉严重分布不均  
    14. static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;  
    15. /** The queue for corrupt blocks: {@value} */  
    16. // 第五优先级队列的优先级值4:主要针对损坏的数据块  
    17. static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;  

            数据块复制优先级共分为五种级别,从高到底依次如下:

            1、QUEUE_HIGHEST_PRIORITY = 0:最高优先级

            主要针对数据块副本数非常的、严重的不足的情况,当前副本数低于期望值,且仅有1个或者干脆没有,比如副本数仅有1个,或者副本数干脆为0,但是还存在退役副本,这种情况最危险,数据最容易丢失,所以复制的优先级也最高;

            2、QUEUE_VERY_UNDER_REPLICATED = 1:第二优先级

            主要针对数据块副本数比较不足的情况,比上面的情况好点,当前副本数低于期望值,但是副本数大于1,其判断公式为当前副本数curReplicas乘以3还小于期望副本数expectedReplicas,这种情况也比较危险,数据也容易丢失,所以复制的优先级也很高;

            3、QUEUE_UNDER_REPLICATED = 2:第三优先级

            主要针对数据块副本数低于期望值,但是还不是很严重、很危急的情况;

            4、QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3:第四优先级

            主要针对数据块已经有足够的副本数,但是没有足够的机架的情况,这是负载均衡等策略需要的产物;

            5、QUEUE_WITH_CORRUPT_BLOCKS = 4:第五优先级

            主要针对损坏的数据块的情况,其副本数位0,但是还没有退役副本,所以优先级最低,话说,这种数据块还需要哦复制吗?留个小小的疑问吧!

            通过上面的说明,我们可以简单总结下:

            当前副本数低于期望值时,如果当前副本数为1,甚至存在退役副本的情况下为0时,其复制优先级最高,如果当前副本数为0且没有退役副本,则复制优先级最低;如果当前副本数大于1,但是乘以3还小于期望副本数,处于比较危险的情况,则优先级次之,否则是第三优先级。而当当前副本数等于或高于期望值时,则可能是没有足够机架的情况,此时的优先级比最低优先级稍高,为第四优先级。

            UnderReplicatedBlocks也提供了根据数据块及其副本情况来获取复制优先级的getPriority()方法,代码如下:

    [java] view plain copy
     
    1.  /** Return the priority of a block 
    2.   * 计算指定数据块复制优先级 
    3.   *  
    4.   * @param block a under replicated block 
    5.   * @param curReplicas current number of replicas of the block 
    6.   * @param expectedReplicas expected number of replicas of the block 
    7.   * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1) 
    8.   */  
    9.  private int getPriority(Block block,  
    10.                          int curReplicas,   
    11.                          int decommissionedReplicas,  
    12.                          int expectedReplicas) {  
    13.      
    14. // 参数校验:当前副本数curReplicas应大于等于0  
    15. assert curReplicas >= 0 : "Negative replicas!";  
    16.   
    17. // 如果当前副本数curReplicas大于等于期望的副本数,则返回第四优先级队列的优先级值3  
    18.    if (curReplicas >= expectedReplicas) {  
    19.       
    20.      // 数据块已经有足够的副本数,但是没有足够的机架  
    21.      // Block has enough copies, but not enough racks  
    22.      return QUEUE_REPLICAS_BADLY_DISTRIBUTED;  
    23.    } else if (curReplicas == 0) {// 如果当前副本数curReplicas为0  
    24.       
    25.      // If there are zero non-decommissioned replicas but there are  
    26.      // some decommissioned replicas, then assign them highest priority  
    27.      // 如果decommissionedReplicas大于0,返回最高优先级队列的优先级值0  
    28.      // 即没有非退役副本,但是有一些退役的副本,那么我们需要分配给它们最高优先级  
    29.      if (decommissionedReplicas > 0) {  
    30.        return QUEUE_HIGHEST_PRIORITY;  
    31.      }  
    32.        
    33.      // all we have are corrupt blocks  
    34.      // 没有非退役副本,也没有退役副本,我们就认为它是个损坏的数据块,复制优先级最低,为第五优先级队列的优先级值4  
    35.      return QUEUE_WITH_CORRUPT_BLOCKS;  
    36.    } else if (curReplicas == 1) {// 如果当前副本数curReplicas为1  
    37.      //only on replica -risk of loss  
    38.      // highest priority  
    39.      // 仅仅有一个副本,有丢失的风险,所以赋予最高优先级0  
    40.      return QUEUE_HIGHEST_PRIORITY;  
    41.    } else if ((curReplicas * 3) < expectedReplicas) {  
    42.      //there is less than a third as many blocks as requested;  
    43.      //this is considered very under-replicated  
    44.      // 如果如果当前副本数curReplicas乘以3还小于期望副本数expectedReplicas,返回第二优先级队列的优先级值1  
    45.      return QUEUE_VERY_UNDER_REPLICATED;  
    46.    } else {  
    47.      //add to the normal queue for under replicated blocks  
    48.      // 一般的低于副本数的优先级,返回第三优先级队列的优先级值2  
    49.      return QUEUE_UNDER_REPLICATED;  
    50.    }  

            思路和上述介绍一样,大体逻辑如下:

            1、如果当前副本数curReplicas大于等于期望的副本数,则返回第四优先级队列的优先级值3--QUEUE_REPLICAS_BADLY_DISTRIBUTED;

            2、如果当前副本数curReplicas为0,且如果decommissionedReplicas大于0,返回最高优先级队列的优先级值0,没有非退役副本,也没有退役副本,我们就认为它是个损坏的数据块,复制优先级最低,为第五优先级队列的优先级值4;

            3、如果当前副本数curReplicas为1,仅仅有一个副本,有丢失的风险,所以赋予最高优先级0;

            4、如果如果当前副本数curReplicas乘以3还小于期望副本数expectedReplicas,返回第二优先级队列的优先级值1;

            5、一般的低于副本数的优先级,返回第三优先级队列的优先级值2。

             UnderReplicatedBlocks还提供了涉及复制优先级队列的成员变量,如下:

    [java] view plain copy
     
    1. /** The total number of queues : {@value} */  
    2. // 队列总数  
    3. static final int LEVEL = 5;  
    4. /** the queues themselves */  
    5. // 队列集合  
    6. private final List<LightWeightLinkedSet<Block>> priorityQueues  
    7.     = new ArrayList<LightWeightLinkedSet<Block>>();  
    8. /** Stores the replication index for each priority */  
    9. // 存储每个优先级对应的复制索引  
    10. private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);  

            总的队列数目为5,而存储待复制不同优先级块集合的是priorityQueues列表,它是数据块集合LightWeightLinkedSet的列表,并且还提供了存储每种优先级对应的块复制索引的集合priorityToReplIdx,它是数字形式优先级priority到块在集合LightWeightLinkedSet中位置索引index的映射。

            UnderReplicatedBlocks的构造函数如下:

    [java] view plain copy
     
    1.  /** Create an object. */  
    2.  // 构造函数,创建一个对象  
    3.  UnderReplicatedBlocks() {  
    4.     
    5. // 5个LightWeightLinkedSet集合,存储到priorityQueues列表中,  
    6. // 并将优先级与复制索引的映射存储到priorityToReplIdx中  
    7.    for (int i = 0; i < LEVEL; i++) {  
    8.      priorityQueues.add(new LightWeightLinkedSet<Block>());  
    9.      priorityToReplIdx.put(i, 0);  
    10.    }  
    11.  }  

            上来先构造5个LightWeightLinkedSet集合,并按照优先级由高到低的顺序,添加到列表priorityQueues中,并初始化每种块复制优先级对应的位置索引为0。

            UnderReplicatedBlocks还提供了相应的添加、移除数据块及更新优先级方法,分别介绍如下:

            1、添加数据块add()

    [java] view plain copy
     
    1. /** add a block to a under replication queue according to its priority 
    2.  * @param block a under replication block 
    3.  * @param curReplicas current number of replicas of the block 
    4.  * @param decomissionedReplicas the number of decommissioned replicas 
    5.  * @param expectedReplicas expected number of replicas of the block 
    6.  * @return true if the block was added to a queue. 
    7.  */  
    8. synchronized boolean add(Block block,  
    9.                          int curReplicas,   
    10.                          int decomissionedReplicas,  
    11.                          int expectedReplicas) {  
    12.   assert curReplicas >= 0 : "Negative replicas!";  
    13.     
    14.   // 根据入参数据块及其副本情况计算块复制的优先级priLevel  
    15.   int priLevel = getPriority(block, curReplicas, decomissionedReplicas,  
    16.                              expectedReplicas);  
    17.     
    18.   // 如果块复制优先级priLevel小于5(即是一个正确有效的优先级),并且  
    19.   // 根据优先级priLevel从priorityQueues中取出相应块集合并将块添加入集合成功的话,返回true  
    20.   if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) {  
    21.     if(NameNode.blockStateChangeLog.isDebugEnabled()) {  
    22.       NameNode.blockStateChangeLog.debug(  
    23.         "BLOCK* NameSystem.UnderReplicationBlock.add:"  
    24.         + block  
    25.         + " has only " + curReplicas  
    26.         + " replicas and need " + expectedReplicas  
    27.         + " replicas so is added to neededReplications"  
    28.         + " at priority level " + priLevel);  
    29.     }  
    30.     return true;  
    31.   }  
    32.   // 否则返回false  
    33.   return false;  
    34. }  

            添加数据块的add()方法比较简单,首先根据入参数据块及其副本情况调用getPriority()方法计算块复制的优先级priLevel,然后如果块复制优先级priLevel小于5(即是一个正确有效的优先级),并且根据优先级priLevel从priorityQueues中取出相应块集合并将块添加入集合成功的话,返回true,表示添加成功,否则返回false,表示添加失败。

            2、移除数据块remove()

    [java] view plain copy
     
    1.  /** remove a block from a under replication queue */  
    2.  synchronized boolean remove(Block block,   
    3.                              int oldReplicas,   
    4.                              int decommissionedReplicas,  
    5.                              int oldExpectedReplicas) {  
    6.      
    7. // 先根据入参数据块及其副本情况,调用getPriority()方法计算块复制优先级priLevel  
    8. int priLevel = getPriority(block, oldReplicas,   
    9.                               decommissionedReplicas,  
    10.                               oldExpectedReplicas);  
    11.   
    12. // 调用两个参数的remove()方法,移除数据块  
    13.    return remove(block, priLevel);  
    14.  }  
    [java] view plain copy
     
    1.  /** 
    2.   * Remove a block from the under replication queues. 
    3.   * 
    4.   * The priLevel parameter is a hint of which queue to query 
    5.   * first: if negative or >= {@link #LEVEL} this shortcutting 
    6.   * is not attmpted. 
    7.   * 
    8.   * If the block is not found in the nominated queue, an attempt is made to 
    9.   * remove it from all queues. 
    10.   * 
    11.   * <i>Warning:</i> This is not a synchronized method. 
    12.   * @param block block to remove 
    13.   * @param priLevel expected privilege level 
    14.   * @return true if the block was found and removed from one of the priority queues 
    15.   */  
    16.  boolean remove(Block block, int priLevel) {  
    17.      
    18. // 如果优先级priLevel是正确有效的,且根据优先级priLevel从列表priorityQueues中  
    19. // 取出数据块集合后,从中移除数据块成功的话,返回true,表示移除成功  
    20. if(priLevel >= 0 && priLevel < LEVEL   
    21.        && priorityQueues.get(priLevel).remove(block)) {  
    22.      if(NameNode.blockStateChangeLog.isDebugEnabled()) {  
    23.        NameNode.blockStateChangeLog.debug(  
    24.          "BLOCK* NameSystem.UnderReplicationBlock.remove: "  
    25.          + "Removing block " + block  
    26.          + " from priority queue "+ priLevel);  
    27.      }  
    28.      return true;  
    29.    } else {  
    30.       
    31.      // 否则,在给定优先级对应数据块集合中移除失败的话,尝试从所有优先级各自对应的队列中移除数据块,  
    32.      // 任何一个移除成功,均返回true,表示移除成功  
    33.       
    34.      // Try to remove the block from all queues if the block was  
    35.      // not found in the queue for the given priority level.  
    36.      for (int i = 0; i < LEVEL; i++) {  
    37.        if (priorityQueues.get(i).remove(block)) {  
    38.          if(NameNode.blockStateChangeLog.isDebugEnabled()) {  
    39.            NameNode.blockStateChangeLog.debug(  
    40.              "BLOCK* NameSystem.UnderReplicationBlock.remove: "  
    41.              + "Removing block " + block  
    42.              + " from priority queue "+ i);  
    43.          }  
    44.          return true;  
    45.        }  
    46.      }  
    47.    }  
    48.   
    49. // 最后,如果还不行的话,则返回false,表示移除失败  
    50.    return false;  
    51.  }  

            首先,在四个参数的remove()方法中,先根据入参数据块及其副本情况,调用getPriority()方法计算块复制优先级priLevel,然后调用两个参数的remove()方法,移除数据块;

            其次,在两个参数的remove()方法中,如果优先级priLevel是正确有效的,且根据优先级priLevel从列表priorityQueues中取出数据块集合后,从中移除数据块成功的话,返回true,表示移除成功;否则,在给定优先级对应数据块集合中移除失败的话,尝试从所有优先级各自对应的队列中移除数据块,任何一个移除成功,均返回true,表示移除成功;最后,如果还不行的话,则返回false,表示移除失败。

            3、更新优先级update()

    [java] view plain copy
     
    1.  /** 
    2.   * Recalculate and potentially update the priority level of a block. 
    3.   * 
    4.   * If the block priority has changed from before an attempt is made to 
    5.   * remove it from the block queue. Regardless of whether or not the block 
    6.   * is in the block queue of (recalculate) priority, an attempt is made 
    7.   * to add it to that queue. This ensures that the block will be 
    8.   * in its expected priority queue (and only that queue) by the end of the 
    9.   * method call. 
    10.   * @param block a under replicated block 
    11.   * @param curReplicas current number of replicas of the block 
    12.   * @param decommissionedReplicas  the number of decommissioned replicas 
    13.   * @param curExpectedReplicas expected number of replicas of the block 
    14.   * @param curReplicasDelta the change in the replicate count from before 
    15.   * @param expectedReplicasDelta the change in the expected replica count from before 
    16.   */  
    17.  synchronized void update(Block block, int curReplicas,  
    18.                           int decommissionedReplicas,  
    19.                           int curExpectedReplicas,  
    20.                           int curReplicasDelta, int expectedReplicasDelta) {  
    21.      
    22. // curReplicas代表当前副本数,curReplicasDelta代表之前发生的副本数变化  
    23. // curExpectedReplicas代表当前期望副本数,expectedReplicasDelta代表之前发生的期望副本数变化  
    24.   
    25. // 计算之前的副本数oldReplicas和之前的期望副本数oldExpectedReplicas  
    26. int oldReplicas = curReplicas-curReplicasDelta;  
    27.    int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;  
    28.      
    29.    // 计算当前的块复制优先级curPri  
    30.    int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas);  
    31.      
    32.    // 计算之前的块复制优先级oldPri  
    33.    int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas);  
    34.    if(NameNode.stateChangeLog.isDebugEnabled()) {  
    35.      NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +   
    36.        block +  
    37.        " curReplicas " + curReplicas +  
    38.        " curExpectedReplicas " + curExpectedReplicas +  
    39.        " oldReplicas " + oldReplicas +  
    40.        " oldExpectedReplicas  " + oldExpectedReplicas +  
    41.        " curPri  " + curPri +  
    42.        " oldPri  " + oldPri);  
    43.    }  
    44.      
    45.    // 如果之前优先级oldPri合法且不等于当前优先级curPri  
    46.    if(oldPri != LEVEL && oldPri != curPri) {  
    47.      // 调用remove()方法移除数据块  
    48.      remove(block, oldPri);  
    49.    }  
    50.      
    51.    // 如果当前优先级curPri合法,通过当前优先级curPri从priorityQueues列表中获取对应数据块集合并将数据块添加进去  
    52.    if(curPri != LEVEL && priorityQueues.get(curPri).add(block)) {  
    53.      if(NameNode.blockStateChangeLog.isDebugEnabled()) {  
    54.        NameNode.blockStateChangeLog.debug(  
    55.          "BLOCK* NameSystem.UnderReplicationBlock.update:"  
    56.          + block  
    57.          + " has only "+ curReplicas  
    58.          + " replicas and needs " + curExpectedReplicas  
    59.          + " replicas so is added to neededReplications"  
    60.          + " at priority level " + curPri);  
    61.      }  
    62.    }  
    63.  }  

            更新优先级update()方法用于当数据块副本数或期望副本数等发生变化时,调整数据块复制优先级,并调整其在UnderReplicatedBlocks中的相应存储位置。大体逻辑如下:

            1、首先搞清楚几个参数:curReplicas代表当前副本数,curReplicasDelta代表之前发生的副本数变化,curExpectedReplicas代表当前期望副本数,expectedReplicasDelta代表之前发生的期望副本数变化;

            2、计算之前的副本数oldReplicas和之前的期望副本数oldExpectedReplicas;

            3、计算当前的块复制优先级curPri;

            4、计算之前的块复制优先级oldPri;

            5、如果之前优先级oldPri合法且不等于当前优先级curPri:调用remove()方法移除数据块;

            6、如果当前优先级curPri合法,通过当前优先级curPri从priorityQueues列表中获取对应数据块集合并将数据块添加进去。

            未完待续,更多精彩尽在《HDFS源码分析之UnderReplicatedBlocks(二)》

  • 相关阅读:
    带你进入异步Django+Vue的世界
    xps转换为pdf
    当对函数的返回值有多种需求时(执行是否成功,及业务数据的返回值),可采用的方法
    WPF 打印崩溃问题( 异常:Illegal characters in path/路径中有非法字符)
    集群、限流、缓存 BAT 大厂无非也就是这么做
    C#简单爬取数据(.NET使用HTML解析器ESoup和正则两种方式匹配数据)
    公共静态函数、属性 的 “关联成员” 的 生命周期
    python 之 Django框架(ORM常用字段和字段参数、关系字段和和字段参数)
    Django文档
    go micro 微服务框架温习
  • 原文地址:https://www.cnblogs.com/0xcafedaddy/p/8472775.html
Copyright © 2011-2022 走看看